qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rob...@apache.org
Subject [5/9] qpid-jms git commit: add test that rolls back transaction with prefetch already full, leading to the drain request not resulting in a response Flow
Date Tue, 09 Dec 2014 16:03:08 GMT
add test that rolls back transaction with prefetch already full, leading to the drain request
not resulting in a response Flow


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/12fc1804
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/12fc1804
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/12fc1804

Branch: refs/heads/master
Commit: 12fc1804b372060e28f2830287cdd99eb179e64a
Parents: 68ee9f5
Author: Robert Gemmell <robbie@apache.org>
Authored: Tue Dec 9 12:27:53 2014 +0000
Committer: Robert Gemmell <robbie@apache.org>
Committed: Tue Dec 9 12:27:53 2014 +0000

----------------------------------------------------------------------
 .../jms/integration/SessionIntegrationTest.java | 67 ++++++++++++++++++++
 1 file changed, 67 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/12fc1804/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 3303d5d..1d4d304 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -548,6 +548,73 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout=5000)
+    public void testRollbackTransactedSessionWithPrefetchFullBeforeDrain() throws Exception
{
+        try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            int messageCount = 5;
+            ((JmsConnection) connection).getPrefetchPolicy().setAll(messageCount);
+            connection.start();
+
+            testPeer.expectBegin(true);
+            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+
+            // Create a consumer and fill the prefetch with messages, which we wont consume
any of
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"),
messageCount);
+
+            session.createConsumer(queue);
+
+            // Create a producer to use in provoking creation of the AMQP transaction
+            testPeer.expectSenderAttach();
+            MessageProducer producer  = session.createProducer(queue);
+
+            // First expect an unsettled 'declare' transfer to the txn coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+            TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, false, new Declared().setTxnId(txnId),
true);
+
+            // Expect the message which provoked creating the transaction
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+            messageMatcher.setMessageAnnotationsMatcher( new MessageAnnotationsSectionMatcher(true));
+            testPeer.expectTransfer(messageMatcher); //TODO: check it is marked as being
in the transaction
+
+            producer.send(session.createMessage());
+
+            // Expect the consumer to be 'stopped' prior to rollback by issuing a 'drain'.
We will NOT send a flow
+            // response as we have manipulated that all the 'on the wire' credit was already
used.
+            testPeer.expectLinkFlow(true, false, equalTo(UnsignedInteger.ZERO));
+
+            // Expect an unsettled 'discharge' transfer to the txn coordinator containing
the txnId,
+            // and reply with accepted and settled disposition to indicate the rollback succeeded
+            Discharge discharge = new Discharge();
+            discharge.setFail(true);
+            discharge.setTxnId(txnId);
+            TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
+            dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
+            testPeer.expectTransfer(dischargeMatcher, false, new Accepted(), true);
+
+            // Expect the messages that were not consumed to be released
+            for (int i = 1; i <= messageCount; i++) {
+                testPeer.expectDisposition(true, new ReleasedMatcher());
+            }
+
+            // Expect the consumer to be 'started' again as rollback completes
+            testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount)));
+
+            session.rollback();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout=5000)
     public void testDefaultOutcomeIsModifiedForConsumerSourceOnTransactedSession() throws
Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
             Connection connection = testFixture.establishConnecton(testPeer);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message