Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 984ED10E4D for ; Mon, 21 Dec 2015 13:33:06 +0000 (UTC) Received: (qmail 74083 invoked by uid 500); 21 Dec 2015 13:33:06 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 74054 invoked by uid 500); 21 Dec 2015 13:33:06 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 74045 invoked by uid 99); 21 Dec 2015 13:33:06 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Dec 2015 13:33:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id F06031A0120 for ; Mon, 21 Dec 2015 13:33:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.246 X-Spam-Level: * X-Spam-Status: No, score=1.246 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.554] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id ImjfNptWVtgd for ; Mon, 21 Dec 2015 13:33:04 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTP id 830792050A for ; Mon, 21 Dec 2015 13:33:04 +0000 (UTC) Received: from svn01-us-west.apache.org (svn.apache.org [10.41.0.6]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 1D664E0428 for ; Mon, 21 Dec 2015 13:33:04 +0000 (UTC) Received: from svn01-us-west.apache.org (localhost [127.0.0.1]) by svn01-us-west.apache.org (ASF Mail Server at svn01-us-west.apache.org) with ESMTP id 1E60C3A05E6 for ; Mon, 21 Dec 2015 13:33:04 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1721151 - in /qpid/java/trunk: client/src/main/java/org/apache/qpid/client/ systests/src/test/java/org/apache/qpid/test/unit/close/ Date: Mon, 21 Dec 2015 13:33:04 -0000 To: commits@qpid.apache.org From: orudyy@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20151221133304.1E60C3A05E6@svn01-us-west.apache.org> Author: orudyy Date: Mon Dec 21 13:33:03 2015 New Revision: 1721151 URL: http://svn.apache.org/viewvc?rev=1721151&view=rev Log: QPID-6951: Release consumer prefetched messages on consumer close regardless whether session is closed or not. Rename/remove consumer methods to have sensible method names indicating what exactly method is really doing. Add system test Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1721151&r1=1721150&r2=1721151&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java Mon Dec 21 13:33:03 2015 @@ -27,7 +27,6 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -939,7 +938,7 @@ public abstract class AMQSession 0) { @@ -942,7 +937,7 @@ public abstract class BasicMessageConsum if (_synchronousQueue.size() != 0) { _logger.warn("Queue was not empty after rejecting all messages Remaining:" + _synchronousQueue.size()); - rollback(); + releasePendingMessages(); } clearReceiveQueue(); Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1721151&r1=1721150&r2=1721151&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Mon Dec 21 13:33:03 2015 @@ -454,7 +454,7 @@ public class BasicMessageConsumer_0_10 e return receiveNoWait(); } - @Override public void rollbackPendingMessages() + @Override void releasePendingMessages() { if (getSynchronousQueue().size() > 0) { Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java?rev=1721151&r1=1721150&r2=1721151&view=diff ============================================================================== --- qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java (original) +++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java Mon Dec 21 13:33:03 2015 @@ -108,4 +108,39 @@ public class MessageConsumerCloseTest e assertEquals("Message three has unexpected content", 2, msg3.getIntProperty(INDEX)); session.commit(); } + + public void testMessagesReceivedBeforeConsumerCloseAreRedeliveredAfterRollback() throws Exception + { + Connection connection = getConnection(); + final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + Destination destination = getTestQueue(); + MessageConsumer consumer = session.createConsumer(destination); + + int messageNumber = 4; + connection.start(); + sendMessage(session, destination, messageNumber); + + for(int i = 0; i < messageNumber/2 ; i++) + { + Message message = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Message [" + i +"] was null", message); + assertEquals("Message [" + i +"] has unexpected content", i, message.getIntProperty(INDEX)); + } + + consumer.close(); + + session.rollback(); + + MessageConsumer consumer2 = session.createConsumer(destination); + + for(int i = 0; i < messageNumber ; i++) + { + Message message = consumer2.receive(RECEIVE_TIMEOUT); + assertNotNull("Message [" + i +"] was null", message); + assertEquals("Message [" + i +"] has unexpected content", i, message.getIntProperty(INDEX)); + } + + session.commit(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org