qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rob...@apache.org
Subject qpid-jms git commit: QPIDJMS-292: ensure credit handling accounts for buffered messages correctly to prevent prefetching more than the expected number of messages
Date Thu, 11 May 2017 10:44:03 GMT
Repository: qpid-jms
Updated Branches:
  refs/heads/master 310f719dd -> b0b554d75


QPIDJMS-292: ensure credit handling accounts for buffered messages correctly to prevent prefetching
more than the expected number of messages


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/b0b554d7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/b0b554d7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/b0b554d7

Branch: refs/heads/master
Commit: b0b554d752f1f0320d73c7645ec89bb51677e5e6
Parents: 310f719
Author: Robert Gemmell <robbie@apache.org>
Authored: Thu May 11 11:06:37 2017 +0100
Committer: Robert Gemmell <robbie@apache.org>
Committed: Thu May 11 11:43:13 2017 +0100

----------------------------------------------------------------------
 .../org/apache/qpid/jms/JmsMessageConsumer.java |   2 +-
 .../apache/qpid/jms/meta/JmsConsumerInfo.java   |  19 ++-
 .../qpid/jms/provider/amqp/AmqpConsumer.java    |  17 ++-
 .../integration/ConsumerIntegrationTest.java    | 143 ++++++++++++++++++-
 .../QueueBrowserIntegrationTest.java            |  12 +-
 .../TransactionsIntegrationTest.java            |  16 +--
 .../qpid/jms/meta/JmsConsumerInfoTest.java      |  43 +++---
 .../jms/meta/JmsDefaultResourceVisitorTest.java |   2 +-
 .../amqp/AmqpSubscriptionTrackerTest.java       |   2 +-
 .../provider/amqp/message/AmqpCodecTest.java    |   2 +-
 .../message/AmqpJmsMessageTypesTestCase.java    |   2 +-
 .../failover/FailoverProviderTestSupport.java   |   2 +-
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    |   5 +
 13 files changed, 200 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b0b554d7/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index b9170e0..77349e8 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -95,7 +95,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer,
JmsMe
         JmsRedeliveryPolicy redeliveryPolicy = session.getRedeliveryPolicy().copy();
         JmsDeserializationPolicy deserializationPolicy = session.getDeserializationPolicy().copy();
 
-        consumerInfo = new JmsConsumerInfo(consumerId);
+        consumerInfo = new JmsConsumerInfo(consumerId, messageQueue);
         consumerInfo.setExplicitClientID(connection.isExplicitClientID());
         consumerInfo.setSelector(selector);
         consumerInfo.setDurable(isDurableSubscription());

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b0b554d7/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
index 61f353d..74256ff 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
@@ -21,6 +21,7 @@ import org.apache.qpid.jms.policy.JmsDefaultDeserializationPolicy;
 import org.apache.qpid.jms.policy.JmsDefaultRedeliveryPolicy;
 import org.apache.qpid.jms.policy.JmsDeserializationPolicy;
 import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
+import org.apache.qpid.jms.util.MessageQueue;
 
 public final class JmsConsumerInfo extends JmsAbstractResource implements Comparable<JmsConsumerInfo>
{
 
@@ -38,6 +39,7 @@ public final class JmsConsumerInfo extends JmsAbstractResource implements
Compar
     private boolean localMessageExpiry;
     private boolean presettle;
     private volatile boolean listener;
+    private final MessageQueue messageQueue;
 
     private JmsRedeliveryPolicy redeliveryPolicy;
     private JmsDeserializationPolicy deserializationPolicy;
@@ -45,23 +47,16 @@ public final class JmsConsumerInfo extends JmsAbstractResource implements
Compar
     // Can be used to track the last consumed message.
     private transient long lastDeliveredSequenceId;
 
-    public JmsConsumerInfo(JmsConsumerId consumerId) {
+    public JmsConsumerInfo(JmsConsumerId consumerId, MessageQueue messageQueue) {
         if (consumerId == null) {
             throw new IllegalArgumentException("Consumer ID cannot be null");
         }
         this.consumerId = consumerId;
-    }
-
-    public JmsConsumerInfo(JmsSessionInfo sessionInfo, long consumerId) {
-        if (sessionInfo == null) {
-            throw new IllegalArgumentException("Session info object cannot be null");
-        }
-
-        this.consumerId = new JmsConsumerId(sessionInfo.getId(), consumerId);
+        this.messageQueue = messageQueue;
     }
 
     public JmsConsumerInfo copy() {
-        JmsConsumerInfo info = new JmsConsumerInfo(consumerId);
+        JmsConsumerInfo info = new JmsConsumerInfo(consumerId, messageQueue);
         copy(info);
         return info;
     }
@@ -83,6 +78,10 @@ public final class JmsConsumerInfo extends JmsAbstractResource implements
Compar
         info.listener = listener;
     }
 
+    public int getPrefetchedMessageCount() {
+        return messageQueue.size();
+    }
+
     @Override
     public JmsConsumerId getId() {
         return consumerId;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b0b554d7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 515defa..b5194cf 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -332,7 +332,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
      * or we are stopping then we never send credit here.
      */
     private void sendFlowIfNeeded() {
-        if (getResourceInfo().getPrefetchSize() == 0 || isStopping()) {
+        int prefetchSize = getResourceInfo().getPrefetchSize();
+        if (prefetchSize == 0 || isStopping()) {
             // TODO: isStopping isn't effective when this method is called following
             // processing the last of any messages received while stopping, since that
             // happens just after we stopped. That may be ok in some situations however,
and
@@ -341,10 +342,16 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
         }
 
         int currentCredit = getEndpoint().getCredit();
-        if (currentCredit <= getResourceInfo().getPrefetchSize() * 0.3) {
-            int newCredit = getResourceInfo().getPrefetchSize() - currentCredit;
-            LOG.trace("Consumer {} granting additional credit: {}", getConsumerId(), newCredit);
-            getEndpoint().flow(newCredit);
+        if (currentCredit <= prefetchSize * 0.5) {
+            int prefetchedMessageCount = getResourceInfo().getPrefetchedMessageCount();
+
+            int potentialPrefetch = currentCredit + prefetchedMessageCount;
+            if(potentialPrefetch <= prefetchSize * 0.7) {
+                int additionalCredit = prefetchSize - currentCredit - prefetchedMessageCount;
+
+                LOG.trace("Consumer {} granting additional credit: {}", getConsumerId(),
additionalCredit);
+                getEndpoint().flow(additionalCredit);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b0b554d7/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index 0677cca..d57b8fc 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -889,8 +889,9 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
             testPeer.expectReceiverAttach();
             testPeer.expectLinkFlowThenPerformUnexpectedDeliveryCountAdvanceThenCreditTopupThenTransfers(prefetch,
topUp, messageCount);
 
-            // Expect consumer to top up the credit window to <prefetch> when accepting
the messages
-            testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(prefetch)));
+            // Expect consumer to top up the credit window to <prefetch-1> when accepting
the first message, accounting
+            // for the fact the second prefetched message is still in its local buffer.
+            testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(prefetch-1)));
             testPeer.expectDisposition(true, new AcceptedMatcher(), 0, 0);
             testPeer.expectDisposition(true, new AcceptedMatcher(), 1, 1);
 
@@ -1786,4 +1787,142 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(2000);
         }
     }
+
+    @Test(timeout=20000)
+    public void testLinkCreditReplenishmentWithPrefetchFilled() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final int prefetch = 10;
+            int initialMessageCount = 10;
+
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all="
+ prefetch);
+            connection.start();
+
+            final CountDownLatch expected = new CountDownLatch(initialMessageCount);
+            ((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener()
{
+                @Override
+                public void onInboundMessage(JmsInboundMessageDispatch envelope) {
+                    expected.countDown();
+                }
+            });
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+            testPeer.expectReceiverAttach();
+
+            // Expect initial credit to be sent, respond with some messages using it all
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent,
+                initialMessageCount, false, false, equalTo(UnsignedInteger.valueOf(prefetch)),
1, false, false);
+
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            // Ensure all the messages arrived so that the matching below is deterministic
+            assertTrue("Expected transfers didnt occur: " + expected.getCount(), expected.await(5,
TimeUnit.SECONDS));
+
+            // Now consume the first 2 messages, expect the ack processing NOT to provoke
flowing more credit, as over
+            // 70% of the max potential prefetch (8 outstanding local messages) remains in
play.
+            int consumed = 0;
+            for (consumed = 1; consumed <= 2; consumed ++) {
+                testPeer.expectDisposition(true, new AcceptedMatcher(), consumed, consumed);
+                Message message = consumer.receiveNoWait();
+                assertNotNull("Should have received a message " + consumed, message);
+            }
+
+            // Now consume 3rd message, expect the ack processing to provoke flowing more
credit as it hits the
+            // 70% low threshhold (credit + prefetched) for replenishing the credit to max
out the prefetch window
+            // again, accounting for the already-prefetched but still not yet consumed messages.
+            // Also have the peer send more messages using all the remaining credit granted.
+            consumed = 3;
+            int newOutstandingCredit = 3;
+            assertEquals("Peer cant send more messages than we will have credit for", newOutstandingCredit,
prefetch - initialMessageCount + consumed);
+
+            final CountDownLatch expected2 = new CountDownLatch(newOutstandingCredit);
+            ((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener()
{
+                @Override
+                public void onInboundMessage(JmsInboundMessageDispatch envelope) {
+                    expected2.countDown();
+                }
+            });
+
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent,
+                    newOutstandingCredit, false, false, equalTo(UnsignedInteger.valueOf(newOutstandingCredit)),
initialMessageCount + 1, false, false);
+            testPeer.expectDisposition(true, new AcceptedMatcher(), consumed, consumed);
+
+            Message message = consumer.receiveNoWait();
+            assertNotNull("Should have received a 3rd message " + consumed, message);
+
+            // Ensure all the new messages arrived so that the matching below is deterministic
+            assertTrue("Expected transfers didnt occur: " + expected2.getCount(), expected2.await(5,
TimeUnit.SECONDS));
+
+            // Consume the rest of the messages, 4-13. Expect only dispositions initially
until the threshold, then
+            // another flow expanding the window to the limit again, then more dispositions,
then another flow as the
+            // threshold is crossed again when the buffered message count decreases further.
+            for (consumed = 4; consumed < prefetch + newOutstandingCredit; consumed++)
{
+                if(consumed == 3 + newOutstandingCredit) {
+                    testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(newOutstandingCredit)));
+                } else if(consumed == 3 + newOutstandingCredit * 2) {
+                    testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(newOutstandingCredit
* 2)));
+                }
+                testPeer.expectDisposition(true, new AcceptedMatcher(), consumed, consumed);
+
+                message = consumer.receiveNoWait();
+                assertNotNull("Should have received a message " + consumed, message);
+            }
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testLinkCreditReplenishmentWithPrefetchTrickleFeed() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final int prefetch = 10;
+
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all="
+ prefetch);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+            testPeer.expectReceiverAttach();
+
+            // Expect initial credit to be sent, respond with a single message.
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent,
+                1, false, false, equalTo(UnsignedInteger.valueOf(prefetch)), 1, false, false);
+
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            // Now consume a message and have the peer send another in response, repeat until
consumed 15 messages
+            int consumed = 0;
+            for (consumed = 1; consumed <= 15; consumed ++) {
+                if(consumed == 5 || consumed == 10 || consumed == 15) {
+                    testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(prefetch)));
+                }
+
+                testPeer.expectDisposition(true, new AcceptedMatcher(), consumed, consumed);
+                if(consumed != 15) {
+                    testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null,
null, null, amqpValueNullContent, consumed +1);
+                }
+
+                Message message = consumer.receive(3000);
+                assertNotNull("Should have received a message " + consumed, message);
+            }
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b0b554d7/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
index 3809454..5fb64cc 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
@@ -208,8 +208,8 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase {
             // at which point we send one, and a response flow to indicate the rest of the
credit was drained.
             testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent,
                 1, true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)),
1, true, false);
-            // Expect the credit window to be opened again.
-            testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
+            // Expect the credit window to be opened again, but accounting for the message
we just prefetched.
+            testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH
- 1)));
             testPeer.expectDetach(true, true, true);
 
             QueueBrowser browser = session.createBrowser(queue);
@@ -246,8 +246,8 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase {
             // at which point we send one, and a response flow to indicate the rest of the
credit was drained.
             testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent,
                 1, true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)),
1, true, false);
-            // Expect the credit window to be opened again.
-            testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
+            // Expect the credit window to be opened again, but accounting for the message
we just prefetched.
+            testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH
- 1)));
             testPeer.expectDetach(true, true, true);
 
             QueueBrowser browser = session.createBrowser(queue);
@@ -296,8 +296,8 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase {
             // at which point we send one, and a response flow to indicate the rest of the
credit was drained.
             testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent,
                 1, true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)),
1, true, false);
-            // Expect a non-draining flow to reopen the credit window again afterwards
-            testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
+            // Expect a non-draining flow to reopen the credit window again afterwards, but
accounting for the message we just prefetched.
+            testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH
- 1)));
 
             QueueBrowser browser = session.createBrowser(queue);
             Enumeration<?> queueView = browser.getEnumeration();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b0b554d7/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
index 866b254..47af07f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -815,11 +815,11 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
 
             session.createConsumer(queue);
 
-            // Create a producer to use in provoking creation of the AMQP transaction
+            // Create a producer
             testPeer.expectSenderAttach();
             MessageProducer producer = session.createProducer(queue);
 
-            // Expect the message which provoked creating the transaction
+            // Expect the message
             TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
             messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
             messageMatcher.setMessageAnnotationsMatcher( new MessageAnnotationsSectionMatcher(true));
@@ -842,17 +842,14 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             // and reply with accepted and settled disposition to indicate the rollback succeeded
             testPeer.expectDischarge(txnId, true);
 
-            // Now expect an unsettled 'declare' transfer to the txn coordinator, and
+            // Now expect an unsettled 'declare' transfer to the txn coordinator for the
next txn, and
             // reply with a declared disposition state containing the txnId.
             txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
             testPeer.expectDeclare(txnId);
 
-            // Expect the consumer to be 'started' again as rollback completes
-            testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount)));
-
-            testPeer.expectDischarge(txnId, true);
             session.rollback();
 
+            testPeer.expectDischarge(txnId, true);
             testPeer.expectClose();
             connection.close();
 
@@ -921,12 +918,9 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
             testPeer.expectDeclare(txnId);
 
-            // Expect the consumer to be 'started' again as rollback completes
-            testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount)));
-            testPeer.expectDischarge(txnId, true);
-
             session.rollback();
 
+            testPeer.expectDischarge(txnId, true);
             testPeer.expectClose();
             connection.close();
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b0b554d7/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
index d4fdcc5..db8e0a8 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
@@ -58,31 +58,20 @@ public class JmsConsumerInfoTest {
 
     @Test(expected=IllegalArgumentException.class)
     public void testExceptionWhenCreatedWithNullConnectionId() {
-        new JmsConsumerInfo(null);
-    }
-
-    @Test(expected=IllegalArgumentException.class)
-    public void testExceptionWhenCreatedWithNullSessionInfo() {
-        new JmsConsumerInfo(null, 1);
+        new JmsConsumerInfo(null, null);
     }
 
     @Test
     public void testCreateFromConsumerId() {
-        JmsConsumerInfo info = new JmsConsumerInfo(firstId);
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null);
         assertSame(firstId, info.getId());
         assertSame(firstId.getParentId(), info.getParentId());
         assertNotNull(info.toString());
     }
 
     @Test
-    public void testCreateFromSessionId() {
-        JmsConsumerInfo info = new JmsConsumerInfo(new JmsSessionInfo(firstSessionId), 1);
-        assertNotNull(info.toString());
-    }
-
-    @Test
     public void testCopy() {
-        JmsConsumerInfo info = new JmsConsumerInfo(firstId);
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null);
 
         info.setAcknowledgementMode(1);
         info.setBrowser(true);
@@ -119,7 +108,7 @@ public class JmsConsumerInfoTest {
 
     @Test
     public void testIsDurable() {
-        JmsConsumerInfo info = new JmsConsumerInfo(firstId);
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null);
         assertFalse(info.isDurable());
         info.setDurable(true);
         assertTrue(info.isDurable());
@@ -127,7 +116,7 @@ public class JmsConsumerInfoTest {
 
     @Test
     public void testIsExplicitClientID() {
-        JmsConsumerInfo info = new JmsConsumerInfo(firstId);
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null);
         assertFalse(info.isExplicitClientID());
         info.setExplicitClientID(true);
         assertTrue(info.isExplicitClientID());
@@ -135,7 +124,7 @@ public class JmsConsumerInfoTest {
 
     @Test
     public void testIsShared() {
-        JmsConsumerInfo info = new JmsConsumerInfo(firstId);
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null);
         assertFalse(info.isShared());
         info.setShared(true);
         assertTrue(info.isShared());
@@ -145,7 +134,7 @@ public class JmsConsumerInfoTest {
     public void testGetSubscriptionName() {
         String subName = "name";
 
-        JmsConsumerInfo info = new JmsConsumerInfo(firstId);
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null);
         assertNull(info.getSubscriptionName());
         info.setSubscriptionName(subName);
         assertEquals(subName, info.getSubscriptionName());
@@ -153,8 +142,8 @@ public class JmsConsumerInfoTest {
 
     @Test
     public void testCompareTo() {
-        JmsConsumerInfo first = new JmsConsumerInfo(firstId);
-        JmsConsumerInfo second = new JmsConsumerInfo(secondId);
+        JmsConsumerInfo first = new JmsConsumerInfo(firstId, null);
+        JmsConsumerInfo second = new JmsConsumerInfo(secondId, null);
 
         assertEquals(-1, first.compareTo(second));
         assertEquals(0, first.compareTo(first));
@@ -163,8 +152,8 @@ public class JmsConsumerInfoTest {
 
     @Test
     public void testHashCode() {
-        JmsConsumerInfo first = new JmsConsumerInfo(firstId);
-        JmsConsumerInfo second = new JmsConsumerInfo(secondId);
+        JmsConsumerInfo first = new JmsConsumerInfo(firstId, null);
+        JmsConsumerInfo second = new JmsConsumerInfo(secondId, null);
 
         assertEquals(first.hashCode(), first.hashCode());
         assertEquals(second.hashCode(), second.hashCode());
@@ -174,8 +163,8 @@ public class JmsConsumerInfoTest {
 
     @Test
     public void testEqualsCode() {
-        JmsConsumerInfo first = new JmsConsumerInfo(firstId);
-        JmsConsumerInfo second = new JmsConsumerInfo(secondId);
+        JmsConsumerInfo first = new JmsConsumerInfo(firstId, null);
+        JmsConsumerInfo second = new JmsConsumerInfo(secondId, null);
 
         assertEquals(first, first);
         assertEquals(second, second);
@@ -189,7 +178,7 @@ public class JmsConsumerInfoTest {
 
     @Test
     public void testVisit() throws Exception {
-        final JmsConsumerInfo first = new JmsConsumerInfo(firstId);
+        final JmsConsumerInfo first = new JmsConsumerInfo(firstId, null);
 
         final AtomicBoolean visited = new AtomicBoolean();
 
@@ -207,7 +196,7 @@ public class JmsConsumerInfoTest {
 
     @Test
     public void testGetRedeliveryPolicyDefaults() {
-        final JmsConsumerInfo info = new JmsConsumerInfo(firstId);
+        final JmsConsumerInfo info = new JmsConsumerInfo(firstId, null);
 
         assertNotNull(info.getRedeliveryPolicy());
         info.setRedeliveryPolicy(null);
@@ -217,7 +206,7 @@ public class JmsConsumerInfoTest {
 
     @Test
     public void testIsListener() {
-        JmsConsumerInfo info = new JmsConsumerInfo(firstId);
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null);
         assertFalse(info.isListener());
         info.setListener(true);
         assertTrue(info.isListener());

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b0b554d7/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsDefaultResourceVisitorTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsDefaultResourceVisitorTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsDefaultResourceVisitorTest.java
index 1dd09e4..00bf2f4 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsDefaultResourceVisitorTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsDefaultResourceVisitorTest.java
@@ -48,7 +48,7 @@ public class JmsDefaultResourceVisitorTest {
         JmsDefaultResourceVisitor visitor = new JmsDefaultResourceVisitor();
         visitor.processConnectionInfo(new JmsConnectionInfo(connectionId));
         visitor.processSessionInfo(new JmsSessionInfo(sessionId));
-        visitor.processConsumerInfo(new JmsConsumerInfo(consumerId));
+        visitor.processConsumerInfo(new JmsConsumerInfo(consumerId, null));
         visitor.processProducerInfo(new JmsProducerInfo(producerId));
         visitor.processDestination(new JmsTemporaryTopic("Test"));
         visitor.processTransactionInfo(new JmsTransactionInfo(sessionId, transactionId));

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b0b554d7/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java
index daa0b56..f60af09 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java
@@ -45,7 +45,7 @@ public class AmqpSubscriptionTrackerTest {
         JmsConsumerId consumerId = new JmsConsumerId("ID:MOCK:1", 1, consumerIdCounter.incrementAndGet());
         JmsTopic topic = new JmsTopic(topicName);
 
-        JmsConsumerInfo consumerInfo = new JmsConsumerInfo(consumerId);
+        JmsConsumerInfo consumerInfo = new JmsConsumerInfo(consumerId, null);
 
         consumerInfo.setSubscriptionName(subscriptionName);
         consumerInfo.setDestination(topic);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b0b554d7/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodecTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodecTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodecTest.java
index b549b28..7ebd50f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodecTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodecTest.java
@@ -75,7 +75,7 @@ public class AmqpCodecTest extends QpidJmsTestCase {
 
         JmsConsumerId consumerId = new JmsConsumerId("ID:MOCK:1", 1, 1);
         mockConsumer = Mockito.mock(AmqpConsumer.class);
-        Mockito.when(mockConsumer.getResourceInfo()).thenReturn(new JmsConsumerInfo(consumerId));
+        Mockito.when(mockConsumer.getResourceInfo()).thenReturn(new JmsConsumerInfo(consumerId,
null));
     }
 
     //----- AmqpHeader encode and decode -------------------------------------//

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b0b554d7/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java
index 813cc82..dede595 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java
@@ -131,7 +131,7 @@ public class AmqpJmsMessageTypesTestCase extends QpidJmsTestCase {
         AmqpConsumer consumer = Mockito.mock(AmqpConsumer.class);
         Mockito.when(consumer.getConnection()).thenReturn(connection);
         Mockito.when(consumer.getDestination()).thenReturn(consumerDestination);
-        Mockito.when(consumer.getResourceInfo()).thenReturn(new JmsConsumerInfo(consumerId));
+        Mockito.when(consumer.getResourceInfo()).thenReturn(new JmsConsumerInfo(consumerId,
null));
         return consumer;
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b0b554d7/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTestSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTestSupport.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTestSupport.java
index 498f36a..19f6d3e 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTestSupport.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTestSupport.java
@@ -78,7 +78,7 @@ public class FailoverProviderTestSupport extends QpidJmsTestCase {
 
     protected JmsConsumerInfo createConsumerInfo(JmsSessionInfo session) {
         JmsConsumerId id = new JmsConsumerId(session.getId(), nextConsumerId.incrementAndGet());
-        JmsConsumerInfo consumer = new JmsConsumerInfo(id);
+        JmsConsumerInfo consumer = new JmsConsumerInfo(id, null);
         return consumer;
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b0b554d7/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 488675f..3d99b75 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -1683,6 +1683,11 @@ public class TestAmqpPeer implements AutoCloseable
                 }
             });
 
+            if(i != messageCount -1) {
+                // Ensure all but the last transfer are set to defer, ensure they go in one
write.
+                transferResponseSender.setDeferWrite(true);
+            }
+
             composite.add(transferResponseSender);
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message