qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oru...@apache.org
Subject [1/2] qpid-broker-j git commit: QPID-7649: [Java Broker] [AMQP1.0] Add support for Attach with incomplete-unsettled
Date Fri, 30 Jun 2017 16:19:53 GMT
Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 72ed1aa52 -> 737c52807


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
new file mode 100644
index 0000000..e85cbc3
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
@@ -0,0 +1,766 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0.transport.link;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.isIn;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.typeCompatibleWith;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeThat;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
+import org.apache.qpid.tests.protocol.v1_0.Response;
+import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.Utils;
+
+public class ResumeDeliveriesTest extends ProtocolTestBase
+{
+    private static final int MIN_MAX_FRAME_SIZE = 512;
+    private static final String TEST_MESSAGE_CONTENT = "foo";
+    private InetSocketAddress _brokerAddress;
+    private String _originalMmsMessageStorePersistence;
+
+    @Before
+    public void setUp()
+    {
+        _originalMmsMessageStorePersistence = System.getProperty("qpid.tests.mms.messagestore.persistence");
+        System.setProperty("qpid.tests.mms.messagestore.persistence", "false");
+
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+    }
+
+    @After
+    public void tearDown()
+    {
+        if (_originalMmsMessageStorePersistence != null)
+        {
+            System.setProperty("qpid.tests.mms.messagestore.persistence", _originalMmsMessageStorePersistence);
+        }
+        else
+        {
+            System.clearProperty("qpid.tests.mms.messagestore.persistence");
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "2.6.13",
+            description = "When a suspended link having unsettled deliveries is resumed,
the unsettled field from the"
+                          + " attach frame will carry the delivery-tags and delivery state
of all deliveries"
+                          + " considered unsettled by the issuing link endpoint.")
+    public void resumeSendingLinkSingleUnsettledDelivery() throws Exception
+    {
+        final String destination = BrokerAdmin.TEST_QUEUE_NAME;
+        final Binary deliveryTag = new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8));
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+
+            final UnsignedInteger linkHandle1 = UnsignedInteger.ZERO;
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse()
+                       .open().consumeResponse(Open.class)
+                       .begin().consumeResponse(Begin.class);
+
+            // 1. attach with ReceiverSettleMode.SECOND
+            interaction.attachHandle(linkHandle1)
+                       .attachRole(Role.SENDER)
+                       .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+                       .attachTargetAddress(destination)
+                       .attach().consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class);
+
+            // 2. send a unsettled delivery
+            final Disposition responseDisposition = interaction.transferHandle(linkHandle1)
+                                                               .transferDeliveryId(UnsignedInteger.ZERO)
+                                                               .transferDeliveryTag(deliveryTag)
+                                                               .transferPayloadData(TEST_MESSAGE_CONTENT)
+                                                               .transfer()
+                                                               .consumeResponse()
+                                                               .getLatestResponse(Disposition.class);
+            assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+            assertThat(responseDisposition.getSettled(), is(Boolean.FALSE));
+            final DeliveryState remoteDeliveryState = responseDisposition.getState();
+
+            // 3. detach the link
+            interaction.detach().consumeResponse(Detach.class);
+
+            // 4. resume the link
+            final UnsignedInteger linkHandle2 = UnsignedInteger.ONE;
+            final Attach responseAttach2 = interaction.attachHandle(linkHandle2)
+                                                      .attachUnsettled(Collections.singletonMap(deliveryTag,
null))
+                                                      .attach().consumeResponse()
+                                                      .getLatestResponse(Attach.class);
+
+            // 5. assert content of unsettled map
+            assertThat(responseAttach2.getTarget(), is(notNullValue()));
+            final Map<Binary, DeliveryState> remoteUnsettled = responseAttach2.getUnsettled();
+            assertThat(remoteUnsettled, is(notNullValue()));
+            assertThat(remoteUnsettled.keySet(), is(equalTo(Collections.singleton(deliveryTag))));
+            assertThat(remoteUnsettled.get(deliveryTag).getClass(), typeCompatibleWith(remoteDeliveryState.getClass()));
+            assertThat(responseAttach2.getIncompleteUnsettled(), is(anyOf(nullValue(), equalTo(false))));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "2.7.3",
+            description = "If the local unsettled map is too large to be encoded within a
frame of the agreed maximum"
+                          + " frame size then the session MAY be ended with the frame-size-too-small
error. The"
+                          + " endpoint SHOULD make use of the ability to send an incomplete
unsettled map (see below)"
+                          + " to avoid sending an error.")
+    public void resumeSendingLinkWithIncompleteUnsettled() throws Exception
+    {
+        final String destination = BrokerAdmin.TEST_QUEUE_NAME;
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse();
+
+            // 0. Open connection with small max-frame-size
+            final Open open = interaction.openMaxFrameSize(UnsignedInteger.valueOf(MIN_MAX_FRAME_SIZE))
+                                         .open().consumeResponse()
+                                         .getLatestResponse(Open.class);
+            interaction.begin().consumeResponse(Begin.class);
+
+            // 1. attach with ReceiverSettleMode.SECOND
+            final UnsignedInteger linkHandle1 = UnsignedInteger.ZERO;
+            interaction.attachHandle(linkHandle1)
+                       .attachRole(Role.SENDER)
+                       .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+                       .attachTargetAddress(destination)
+                       .attach().consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class);
+
+            // 2. send enough unsettled deliveries to cause incomplete-unsettled to be true
+            //    assume each delivery requires at least 1 byte, therefore max-frame-size
deliveries should be enough
+            interaction.transferHandle(linkHandle1)
+                       .transferPayloadData(TEST_MESSAGE_CONTENT);
+            Map<Binary, DeliveryState> localUnsettled = new HashMap<>(open.getMaxFrameSize().intValue());
+            for (int i = 0; i < open.getMaxFrameSize().intValue(); ++i)
+            {
+                final Binary deliveryTag = new Binary(String.valueOf(i).getBytes(StandardCharsets.UTF_8));
+                final Disposition responseDisposition = interaction.transferDeliveryId(UnsignedInteger.valueOf(i))
+                                                                   .transferDeliveryTag(deliveryTag)
+                                                                   .transfer()
+                                                                   .consumeResponse(Disposition.class)
+                                                                   .getLatestResponse(Disposition.class);
+                assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+                assertThat(responseDisposition.getSettled(), is(Boolean.FALSE));
+                localUnsettled.put(deliveryTag, null);
+            }
+
+            // 3. detach the link
+            interaction.detach().consumeResponse(Detach.class);
+
+            // 4. resume the link
+            final UnsignedInteger linkHandle2 = UnsignedInteger.ONE;
+            final Binary sampleLocalUnsettled = localUnsettled.keySet().iterator().next();
+            Map<Binary, DeliveryState> unsettled = Collections.singletonMap(sampleLocalUnsettled,
+                                                                            localUnsettled.get(sampleLocalUnsettled));
+            final Response<?> latestResponse = interaction.attachHandle(linkHandle2)
+                                                          .attachUnsettled(unsettled)
+                                                          .attachIncompleteUnsettled(true)
+                                                          .attach().consumeResponse(End.class,
Attach.class)
+                                                          .getLatestResponse();
+
+            if (latestResponse.getBody() instanceof End)
+            {
+                // 5.a assert session end error
+                final End responseEnd = (End) latestResponse.getBody();
+                final Error error = responseEnd.getError();
+                assertThat(error, is(notNullValue()));
+                assertThat(error.getCondition().getValue(), is(equalTo(AmqpError.FRAME_SIZE_TOO_SMALL)));
+            }
+            else if (latestResponse.getBody() instanceof Attach)
+            {
+                // 5.b assert content of unsettled map
+                final Attach responseAttach2 = (Attach) latestResponse.getBody();
+                assertThat(responseAttach2.getTarget(), is(notNullValue()));
+                final Map<Binary, DeliveryState> remoteUnsettled = responseAttach2.getUnsettled();
+                assertThat(remoteUnsettled, is(notNullValue()));
+                assertThat(remoteUnsettled.keySet(), is(not(empty())));
+                for (Binary deliveryTag : remoteUnsettled.keySet())
+                {
+                    assertThat(deliveryTag, isIn(localUnsettled.keySet()));
+                }
+                assertThat(responseAttach2.getIncompleteUnsettled(), is(equalTo(true)));
+            }
+            else
+            {
+                fail(String.format("Unexpected response. Expected End or Attach. Got '%s'.",
latestResponse.getBody()));
+            }
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "2.7.3", description =
+            "If set to true [incomplete-unsettled] indicates that the unsettled map provided
is not complete. "
+            + "When the map is incomplete the recipient of the map cannot take the absence
of a delivery tag from "
+            + "the map as evidence of settlement. On receipt of an incomplete unsettled map
a sending endpoint MUST "
+            + "NOT send any new deliveries (i.e. deliveries where resume is not set to true)
to its partner (and "
+            + "a receiving endpoint which sent an incomplete unsettled map MUST detach with
an error on "
+            + "receiving a transfer which does not have the resume flag set to true).")
+    public void rejectNewDeliveryWhilstUnsettledIncomplete() throws Exception
+    {
+        final String destination = BrokerAdmin.TEST_QUEUE_NAME;
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse();
+
+            // 0. Open connection with small max-frame-size
+            final Open open = interaction.openMaxFrameSize(UnsignedInteger.valueOf(MIN_MAX_FRAME_SIZE))
+                                         .open().consumeResponse()
+                                         .getLatestResponse(Open.class);
+            interaction.begin().consumeResponse(Begin.class);
+
+            // 1. attach with ReceiverSettleMode.SECOND
+            final UnsignedInteger linkHandle1 = UnsignedInteger.ZERO;
+            interaction.attachHandle(linkHandle1)
+                       .attachRole(Role.SENDER)
+                       .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+                       .attachTargetAddress(destination)
+                       .attach().consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class);
+
+            // 2. send enough unsettled deliverys to cause incomplete-unsettled to be true
+            //    assume each delivery requires at least 1 byte, therefore max-frame-size
deliveries should be enough
+            interaction.transferHandle(linkHandle1)
+                       .transferPayloadData(TEST_MESSAGE_CONTENT);
+            Map<Binary, DeliveryState> localUnsettled = new HashMap<>(open.getMaxFrameSize().intValue());
+            for (int i = 0; i < open.getMaxFrameSize().intValue(); ++i)
+            {
+                final Binary deliveryTag = new Binary(String.valueOf(i).getBytes(StandardCharsets.UTF_8));
+                final Disposition responseDisposition = interaction.transferDeliveryId(UnsignedInteger.valueOf(i))
+                                                                   .transferDeliveryTag(deliveryTag)
+                                                                   .transfer()
+                                                                   .consumeResponse(Disposition.class)
+                                                                   .getLatestResponse(Disposition.class);
+                assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+                assertThat(responseDisposition.getSettled(), is(Boolean.FALSE));
+                localUnsettled.put(deliveryTag, null);
+            }
+
+            // 3. detach the link
+            interaction.detach().consumeResponse(Detach.class);
+
+            // 4. resume the link
+            final UnsignedInteger linkHandle2 = UnsignedInteger.ONE;
+            final Binary sampleLocalUnsettled = localUnsettled.keySet().iterator().next();
+            Map<Binary, DeliveryState> unsettled = Collections.singletonMap(sampleLocalUnsettled,
+                                                                            localUnsettled.get(sampleLocalUnsettled));
+            final Response<?> latestResponse = interaction.attachHandle(linkHandle2)
+                                                          .attachUnsettled(unsettled)
+                                                          .attachIncompleteUnsettled(true)
+                                                          .attach().consumeResponse(End.class,
Attach.class)
+                                                          .getLatestResponse();
+            assumeThat(latestResponse.getBody(), is(instanceOf(Attach.class)));
+
+            // 5. ensure attach has incomplete-unsettled
+            final Attach responseAttach = (Attach) latestResponse.getBody();
+            assertThat(responseAttach.getIncompleteUnsettled(), is(equalTo(true)));
+
+            // 6. send new transfer
+            final Binary newDeliveryTag = new Binary("newTransfer".getBytes(StandardCharsets.UTF_8));
+            final Detach detachWithError = interaction.transferHandle(linkHandle2)
+                                                      .transferDeliveryId(UnsignedInteger.ONE)
+                                                      .transferDeliveryTag(newDeliveryTag)
+                                                      .transfer().consumeResponse()
+                                                      .getLatestResponse(Detach.class);
+            assertThat(detachWithError.getError(), is(notNullValue()));
+            final Error detachError = detachWithError.getError();
+            assertThat(detachError.getCondition(), is(equalTo(AmqpError.ILLEGAL_STATE)));
+        }
+    }
+
+    @Ignore("QPID-7845")
+    @Test
+    @SpecificationTest(section = "2.7.3", description =
+            "If set to true [incomplete-unsettled] indicates that the unsettled map provided
is not complete. "
+            + "When the map is incomplete the recipient of the map cannot take the absence
of a delivery tag from "
+            + "the map as evidence of settlement. On receipt of an incomplete unsettled map
a sending endpoint MUST "
+            + "NOT send any new deliveries (i.e. deliveries where resume is not set to true)
to its partner (and "
+            + "a receiving endpoint which sent an incomplete unsettled map MUST detach with
an error on "
+            + "receiving a transfer which does not have the resume flag set to true).")
+    public void incompleteUnsettledReceiving() throws Exception
+    {
+        for (int i = 0; i < MIN_MAX_FRAME_SIZE; i++)
+        {
+            getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT
+ "-" + i);
+        }
+
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try (FrameTransport transport = new FrameTransport(addr).connect())
+        {
+            // 1. open with small max-frame=512, begin, attach receiver with
+            //    with rcv-settle-mode=second, snd-settle-mode=unsettled,
+            //    flow with incoming-window=MAX_INTEGER and link-credit=MAX_INTEGER
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol()
+                       .consumeResponse()
+                       .openMaxFrameSize(UnsignedInteger.valueOf(MIN_MAX_FRAME_SIZE))
+                       .open()
+                       .consumeResponse()
+                       .begin()
+                       .consumeResponse(Begin.class)
+                       .attachRole(Role.RECEIVER)
+                       .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+                       .attachSndSettleMode(SenderSettleMode.UNSETTLED)
+                       .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                       .attach()
+                       .consumeResponse(Attach.class);
+
+            Attach attach = interaction.getLatestResponse(Attach.class);
+            assumeThat(attach.getSndSettleMode(), is(equalTo(SenderSettleMode.UNSETTLED)));
+
+            interaction.flowIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE))
+                       .flowLinkCredit(UnsignedInteger.valueOf(Integer.MAX_VALUE))
+                       .flowHandleFromLinkHandle()
+                       .flow();
+
+            // 2. Receive transfers
+            final Map<Binary, DeliveryState> localUnsettled = new HashMap<>();
+            for (int i = 0; i < MIN_MAX_FRAME_SIZE; )
+            {
+                Response<?> response = interaction.consumeResponse().getLatestResponse();
+                if (response.getBody() instanceof Transfer)
+                {
+                    assertThat(response.getBody(), Matchers.is(instanceOf(Transfer.class)));
+                    Transfer responseTransfer = (Transfer) response.getBody();
+                    assertThat(responseTransfer.getMore(), is(not(equalTo(true))));
+                    assertThat(responseTransfer.getSettled(), is(not(equalTo(true))));
+                    localUnsettled.putIfAbsent(responseTransfer.getDeliveryTag(), responseTransfer.getState());
+                    i++;
+                }
+                else if (response.getBody() instanceof Flow || response.getBody() instanceof
Disposition)
+                {
+                    // ignore
+                }
+                else
+                {
+                    fail("Unexpected frame " + response.getBody());
+                }
+            }
+
+            // 3. detach the link
+            interaction.detach().consumeResponse(Detach.class);
+
+            // 4. resume the link
+            final Binary sampleLocalUnsettled = localUnsettled.keySet().iterator().next();
+            Map<Binary, DeliveryState> unsettled = Collections.singletonMap(sampleLocalUnsettled,
+                                                                            localUnsettled.get(sampleLocalUnsettled));
+            final UnsignedInteger linkHandle2 = UnsignedInteger.ONE;
+            Response<?> latestResponse = interaction.attachHandle(linkHandle2)
+                                                    .attachUnsettled(unsettled)
+                                                    .attachIncompleteUnsettled(true)
+                                                    .attach().consumeResponse(End.class,
Attach.class)
+                                                    .getLatestResponse();
+            assumeThat(latestResponse.getBody(), is(instanceOf(Attach.class)));
+
+            final Attach resumingAttach = (Attach) latestResponse.getBody();
+            final Map<Binary, DeliveryState> remoteUnsettled = resumingAttach.getUnsettled();
+            assertThat(remoteUnsettled, is(notNullValue()));
+            assertThat(remoteUnsettled.keySet(), is(not(empty())));
+            for (Binary deliveryTag : remoteUnsettled.keySet())
+            {
+                assertThat(deliveryTag, isIn(localUnsettled.keySet()));
+            }
+            assertThat(resumingAttach.getIncompleteUnsettled(), is(equalTo(true)));
+
+            interaction.flowHandle(linkHandle2).flow();
+
+            boolean received = false;
+            while (!received)
+            {
+                Response<?> nextResponse = interaction.consumeResponse().getLatestResponse();
+                assertThat(nextResponse, is(notNullValue()));
+
+                if (nextResponse.getBody() instanceof Transfer)
+                {
+                    assertThat(nextResponse.getBody(), is(instanceOf(Transfer.class)));
+                    Transfer responseTransfer = (Transfer) nextResponse.getBody();
+                    assertThat(responseTransfer.getMore(), is(not(equalTo(true))));
+                    assertThat(responseTransfer.getSettled(), is(not(equalTo(true))));
+                    assertThat(responseTransfer.getDeliveryTag(), is(equalTo(sampleLocalUnsettled)));
+                    received = true;
+                }
+                else if (nextResponse.getBody() instanceof Flow || nextResponse.getBody()
instanceof Disposition)
+                {
+                    // ignore
+                }
+                else
+                {
+                    fail("Unexpected frame " + nextResponse.getBody());
+                }
+            }
+
+            transport.doCloseConnection();
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "2.6.13", description = "When a suspended link having unsettled
deliveries is resumed,"
+                                                         + " the unsettled field from the
attach frame will carry"
+                                                         + " the delivery-tags and delivery
state of all deliveries"
+                                                         + " considered unsettled by the
issuing link endpoint.")
+    public void resumeReceivingLinkEmptyUnsettled() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction()
+                                                     .negotiateProtocol().consumeResponse()
+                                                     .open().consumeResponse()
+                                                     .begin().consumeResponse()
+                                                     .attachRole(Role.RECEIVER)
+                                                     .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                     .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+                                                     .attach().consumeResponse()
+                                                     .flowIncomingWindow(UnsignedInteger.ONE)
+                                                     .flowLinkCredit(UnsignedInteger.ONE)
+                                                     .flowHandleFromLinkHandle()
+                                                     .flow()
+                                                     .receiveDelivery()
+                                                     .decodeLatestDelivery();
+
+            Object data = interaction.getDecodedLatestDelivery();
+            assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+
+            interaction.dispositionSettled(true)
+                       .dispositionRole(Role.RECEIVER)
+                       .disposition();
+
+            Detach detach = interaction.detach().consumeResponse().getLatestResponse(Detach.class);
+            assertThat(detach.getClosed(), anyOf(nullValue(), equalTo(false)));
+
+            interaction.attachUnsettled(new HashMap<>())
+                       .attach()
+                       .consumeResponse(Attach.class);
+
+            Attach attach = interaction.getLatestResponse(Attach.class);
+
+            Map<Binary, DeliveryState> unsettled = attach.getUnsettled();
+            assertThat(unsettled.entrySet(), empty());
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "2.6.13", description = "When a suspended link having unsettled
deliveries is resumed,"
+                                                         + " the unsettled field from the
attach frame will carry"
+                                                         + " the delivery-tags and delivery
state of all deliveries"
+                                                         + " considered unsettled by the
issuing link endpoint.")
+    public void resumeReceivingLinkWithSingleUnsettledAccepted() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction()
+                                                     .negotiateProtocol().consumeResponse()
+                                                     .open().consumeResponse()
+                                                     .begin().consumeResponse()
+                                                     .attachRole(Role.RECEIVER)
+                                                     .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                     .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+                                                     .attachSndSettleMode(SenderSettleMode.UNSETTLED)
+                                                     .attach().consumeResponse();
+
+            Attach attach = interaction.getLatestResponse(Attach.class);
+            assumeThat(attach.getSndSettleMode(), is(equalTo(SenderSettleMode.UNSETTLED)));
+
+            interaction.flowIncomingWindow(UnsignedInteger.ONE)
+                       .flowLinkCredit(UnsignedInteger.ONE)
+                       .flowHandleFromLinkHandle()
+                       .flow()
+                       .receiveDelivery();
+
+            List<Transfer> transfers = interaction.getLatestDelivery();
+            assertThat(transfers, hasSize(1));
+            Transfer transfer = transfers.get(0);
+
+            Binary deliveryTag = transfer.getDeliveryTag();
+            assertThat(deliveryTag, is(notNullValue()));
+            assertThat(transfer.getSettled(), is(not(equalTo(true))));
+            Object data = interaction.decodeLatestDelivery().getDecodedLatestDelivery();
+            assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+
+            Detach detach = interaction.detach().consumeResponse().getLatestResponse(Detach.class);
+            assertThat(detach.getClosed(), anyOf(nullValue(), equalTo(false)));
+
+            interaction.attachUnsettled(Collections.singletonMap(deliveryTag, new Accepted()))
+                       .attach()
+                       .consumeResponse(Attach.class);
+
+            Attach resumeAttach = interaction.getLatestResponse(Attach.class);
+
+            Map<Binary, DeliveryState> unsettled = resumeAttach.getUnsettled();
+            assertThat(unsettled, is(notNullValue()));
+            assertThat(unsettled.entrySet(), hasSize(1));
+            Map.Entry<Binary, DeliveryState> entry = unsettled.entrySet().iterator().next();
+            assertThat(entry.getKey(), is(equalTo(deliveryTag)));
+
+            interaction.flowNextIncomingId(UnsignedInteger.ONE)
+                       .flowLinkCredit(UnsignedInteger.ONE)
+                       .flowHandleFromLinkHandle()
+                       .flow()
+                       .receiveDelivery();
+
+            transfers = interaction.getLatestDelivery();
+            assertThat(transfers, hasSize(1));
+            Transfer resumeTransfer = transfers.get(0);
+
+            assertThat(resumeTransfer.getResume(), is(equalTo(true)));
+            assertThat(resumeTransfer.getDeliveryTag(), is(equalTo(deliveryTag)));
+            assertThat(resumeTransfer.getPayload(), is(nullValue()));
+
+            if (!Boolean.TRUE.equals(resumeTransfer.getSettled()))
+            {
+                interaction.dispositionSettled(true)
+                           .dispositionState(new Accepted())
+                           .dispositionRole(Role.RECEIVER)
+                           .disposition();
+            }
+
+            transport.doCloseConnection();
+
+            if (getBrokerAdmin().isQueueDepthSupported())
+            {
+                assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME),
+                           is(equalTo(0)));
+            }
+        }
+    }
+
+    @Ignore("QPID-7845")
+    @Test
+    @SpecificationTest(section = "2.6.13", description = "When a suspended link having unsettled
deliveries is resumed,"
+                                                         + " the unsettled field from the
attach frame will carry"
+                                                         + " the delivery-tags and delivery
state of all deliveries"
+                                                         + " considered unsettled by the
issuing link endpoint.")
+    public void resumeReceivingLinkOneUnsettledWithNoOutcome() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction()
+                                                     .negotiateProtocol().consumeResponse()
+                                                     .open().consumeResponse()
+                                                     .begin().consumeResponse()
+                                                     .attachRole(Role.RECEIVER)
+                                                     .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                     .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+                                                     .attachSndSettleMode(SenderSettleMode.UNSETTLED)
+                                                     .attach().consumeResponse();
+
+            Attach attach = interaction.getLatestResponse(Attach.class);
+            assumeThat(attach.getSndSettleMode(), is(equalTo(SenderSettleMode.UNSETTLED)));
+
+            interaction.flowIncomingWindow(UnsignedInteger.ONE)
+                       .flowLinkCredit(UnsignedInteger.ONE)
+                       .flowHandleFromLinkHandle()
+                       .flow()
+                       .receiveDelivery();
+
+            List<Transfer> transfers = interaction.getLatestDelivery();
+            assertThat(transfers, hasSize(1));
+            Transfer transfer = transfers.get(0);
+
+            Binary deliveryTag = transfer.getDeliveryTag();
+            assertThat(deliveryTag, is(notNullValue()));
+            Object data = interaction.decodeLatestDelivery().getDecodedLatestDelivery();
+            assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+
+            Detach detach = interaction.detach().consumeResponse(Detach.class).getLatestResponse(Detach.class);
+            assertThat(detach.getClosed(), anyOf(nullValue(), equalTo(false)));
+
+            interaction.attachUnsettled(Collections.singletonMap(deliveryTag, null))
+                       .attach()
+                       .consumeResponse(Attach.class);
+
+            Attach resumeAttach = interaction.getLatestResponse(Attach.class);
+
+            Map<Binary, DeliveryState> unsettled = resumeAttach.getUnsettled();
+            assertThat(unsettled, is(notNullValue()));
+            assertThat(unsettled.entrySet(), hasSize(1));
+            Map.Entry<Binary, DeliveryState> entry = unsettled.entrySet().iterator().next();
+            assertThat(entry.getKey(), is(equalTo(deliveryTag)));
+
+            interaction.flowNextIncomingId(UnsignedInteger.ONE)
+                       .flowLinkCredit(UnsignedInteger.ONE)
+                       .flowHandleFromLinkHandle()
+                       .flow()
+                       .receiveDelivery();
+
+            transfers = interaction.getLatestDelivery();
+            assertThat(transfers, hasSize(1));
+            Transfer resumeTransfer = transfers.get(0);
+
+            assertThat(resumeTransfer.getResume(), is(equalTo(true)));
+            assertThat(resumeTransfer.getDeliveryTag(), is(equalTo(deliveryTag)));
+            assertThat(resumeTransfer.getPayload(), is(notNullValue()));
+
+            interaction.dispositionSettled(true)
+                       .dispositionState(new Accepted())
+                       .dispositionRole(Role.RECEIVER)
+                       .disposition();
+
+            transport.doCloseConnection();
+
+            if (getBrokerAdmin().isQueueDepthSupported())
+            {
+                assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME),
+                           Matchers.is(Matchers.equalTo(0)));
+            }
+        }
+    }
+
+    @Ignore("QPID-7845")
+    @Test
+    @SpecificationTest(section = "2.6.13",
+            description = "When a suspended link having unsettled deliveries is resumed,
the unsettled field from the"
+                          + " attach frame will carry the delivery-tags and delivery state
of all deliveries"
+                          + " considered unsettled by the issuing link endpoint.")
+    public void resumeSendingLinkSinglePartialDelivery() throws Exception
+    {
+        final String destination = BrokerAdmin.TEST_QUEUE_NAME;
+        final Binary deliveryTag = new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8));
+
+        QpidByteBuffer[] messagePayload = Utils.splitPayload("testData1", 2);
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+
+            final UnsignedInteger linkHandle1 = UnsignedInteger.ZERO;
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse()
+                       .open().consumeResponse(Open.class)
+                       .begin().consumeResponse(Begin.class);
+
+            // 1. attach with ReceiverSettleMode.SECOND
+            interaction.attachHandle(linkHandle1)
+                       .attachRole(Role.SENDER)
+                       .attachTargetAddress(destination)
+                       .attach().consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class);
+
+            // 2. send a partial delivery
+            interaction.transferHandle(linkHandle1)
+                       .transferDeliveryId(UnsignedInteger.ZERO)
+                       .transferDeliveryTag(deliveryTag)
+                       .transferMore(true)
+                       .transferPayload(Collections.singletonList(messagePayload[0]))
+                       .transfer();
+
+            // 3. detach the link
+            interaction.detach().consumeResponse(Detach.class);
+
+            // 4. resume the link
+            final UnsignedInteger linkHandle2 = UnsignedInteger.ONE;
+            final Attach responseAttach2 = interaction.attachHandle(linkHandle2)
+                                                      .attachUnsettled(Collections.singletonMap(deliveryTag,
null))
+                                                      .attach().consumeResponse()
+                                                      .getLatestResponse(Attach.class);
+
+            // 5. assert content of unsettled map
+            assertThat(responseAttach2.getTarget(), is(notNullValue()));
+
+            final Map<Binary, DeliveryState> remoteUnsettled = responseAttach2.getUnsettled();
+            assertThat(remoteUnsettled, is(notNullValue()));
+            assertThat(remoteUnsettled.keySet(), is(equalTo(Collections.singleton(deliveryTag))));
+
+            interaction.transferHandle(linkHandle2)
+                       .transferResume(true)
+                       .transfer()
+                       .sync()
+                       .transferMore(false)
+                       .transferPayload(Collections.singletonList(messagePayload[1]))
+                       .transfer();
+
+            boolean settled = false;
+            do
+            {
+                interaction.consumeResponse();
+                Response<?> response = interaction.getLatestResponse();
+                assertThat(response, is(notNullValue()));
+
+                Object body = response.getBody();
+
+                if (body instanceof Disposition)
+                {
+                    Disposition disposition = (Disposition) body;
+                    assertThat(disposition.getSettled(), is(Matchers.equalTo(true)));
+                    assertThat(disposition.getFirst(), equalTo(UnsignedInteger.ZERO));
+                    settled = true;
+                }
+                else if (!(body instanceof Flow))
+                {
+                    fail("Unexpected response " + body);
+                }
+            }
+            while (!settled);
+
+            transport.doCloseConnection();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java
b/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java
index 3868246..e313222 100644
--- a/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java
+++ b/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java
@@ -63,7 +63,7 @@ public class ZeroPrefetchTest extends QpidBrokerTestCase
         producer.send(secondMessage);
 
 
-        Message receivedMessage = prefetch1consumer.receive(2000l);
+        final Message receivedMessage = prefetch1consumer.receive(2000l);
         assertNotNull("First message was not received", receivedMessage);
         assertEquals("Message property was not as expected", firstPropertyValue, receivedMessage.getStringProperty(TEST_PROPERTY_NAME));
 
@@ -73,9 +73,9 @@ public class ZeroPrefetchTest extends QpidBrokerTestCase
         final Session prefetch2session = prefetch2Connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer prefetch2consumer = prefetch2session.createConsumer(queue);
 
-        receivedMessage = prefetch2consumer.receive(2000l);
-        assertNotNull("Second message was not received", receivedMessage);
-        assertEquals("Message property was not as expected", secondPropertyValue, receivedMessage.getStringProperty(TEST_PROPERTY_NAME));
+        final Message receivedMessage2 = prefetch2consumer.receive(2000l);
+        assertNotNull("Second message was not received", receivedMessage2);
+        assertEquals("Message property was not as expected", secondPropertyValue, receivedMessage2.getStringProperty(TEST_PROPERTY_NAME));
 
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message