From commits-return-53951-archive-asf-public=cust-asf.ponee.io@activemq.apache.org Wed Oct 31 20:28:41 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 9A99418065D for ; Wed, 31 Oct 2018 20:28:40 +0100 (CET) Received: (qmail 53477 invoked by uid 500); 31 Oct 2018 19:28:39 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 53459 invoked by uid 99); 31 Oct 2018 19:28:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 31 Oct 2018 19:28:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A4743E0057; Wed, 31 Oct 2018 19:28:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Wed, 31 Oct 2018 19:28:39 -0000 Message-Id: <00ed3ec47cb44d1a976c353802a36293@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] activemq-artemis git commit: ARTEMIS-2123 Paging not stopped if there are no messages on one subscription Repository: activemq-artemis Updated Branches: refs/heads/master 3e58cf87a -> 54db13326 ARTEMIS-2123 Paging not stopped if there are no messages on one subscription Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/31399486 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/31399486 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/31399486 Branch: refs/heads/master Commit: 31399486acf62747aef39cd146507c0fd5c0e2cd Parents: 3e58cf8 Author: yang wei Authored: Fri Oct 12 20:32:02 2018 +0800 Committer: Clebert Suconic Committed: Wed Oct 31 15:28:20 2018 -0400 ---------------------------------------------------------------------- .../cursor/impl/PageCursorProviderImpl.java | 19 +++ .../tests/integration/paging/PagingTest.java | 164 +++++++++++++++++++ 2 files changed, 183 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/31399486/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java index b814d9b..93869d7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java @@ -364,6 +364,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { ArrayList cursorList = cloneSubscriptions(); long minPage = checkMinPage(cursorList); + deliverIfNecessary(cursorList, minPage); logger.debugf("Asserting cleanup for address %s, firstPage=%d", pagingStore.getAddress(), minPage); @@ -599,6 +600,24 @@ public class PageCursorProviderImpl implements PageCursorProvider { } + private void deliverIfNecessary(Collection cursorList, long minPage) { + boolean currentWriting = minPage == pagingStore.getCurrentWritingPage() ? true : false; + for (PageSubscription cursor : cursorList) { + long firstPage = cursor.getFirstPage(); + if (firstPage == minPage) { + /** + * if first page is current writing page and it's not complete, or + * first page is before the current writing page, we need to trigger + * deliverAsync to delete messages in the pages. + */ + if (cursor.getQueue().getMessageCount() == 0 && (!currentWriting || !cursor.isComplete(firstPage))) { + cursor.getQueue().deliverAsync(); + break; + } + } + } + } + // Inner classes ------------------------------------------------- } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/31399486/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java index 65b5892..180993f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java @@ -6289,6 +6289,170 @@ public class PagingTest extends ActiveMQTestBase { server.stop(); } + @Test + public void testStopPagingWithoutConsumersIfTwoPages() throws Exception { + testStopPagingWithoutConsumersOnOneQueue(true); + } + + @Test + public void testStopPagingWithoutConsumersIfOnePage() throws Exception { + testStopPagingWithoutConsumersOnOneQueue(false); + } + + private void testStopPagingWithoutConsumersOnOneQueue(boolean forceAnotherPage) throws Exception { + boolean persistentMessages = true; + + clearDataRecreateServerDirs(); + + Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); + + server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); + + server.start(); + + try { + ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(120000).setConnectionTTL(5000000).setCallTimeout(120000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); + + ClientSessionFactory sf = locator.createSessionFactory(); + ClientSession session = sf.createSession(false, false, false); + session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=1"), SimpleString.toSimpleString("destQ=1 or both=true"), true); + session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=2"), SimpleString.toSimpleString("destQ=2 or both=true"), true); + PagingStore store = server.getPagingManager().getPageStore(ADDRESS); + Queue queue = server.locateQueue(PagingTest.ADDRESS.concat("=1")); + queue.getPageSubscription().getPagingStore().startPaging(); + + ClientProducer producer = session.createProducer(PagingTest.ADDRESS); + ClientMessage message = session.createMessage(persistentMessages); + message.putBooleanProperty("both", true); + ActiveMQBuffer bodyLocal = message.getBodyBuffer(); + bodyLocal.writeBytes(new byte[1024]); + producer.send(message); + session.commit(); + session.start(); + ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=2")); + message = consumer.receive(5000); + assertNotNull(message); + message.acknowledge(); + assertNull(consumer.receiveImmediate()); + consumer.close(); + session.commit(); + + if (forceAnotherPage) { + queue.getPageSubscription().getPagingStore().forceAnotherPage(); + } + + message = session.createMessage(persistentMessages); + message.putIntProperty("destQ", 1); + bodyLocal = message.getBodyBuffer(); + bodyLocal.writeBytes(new byte[1024]); + producer.send(message); + session.commit(); + + consumer = session.createConsumer(PagingTest.ADDRESS.concat("=1")); + for (int i = 0; i < 2; i++) { + message = consumer.receive(5000); + assertNotNull(message); + message.acknowledge(); + session.commit(); + } + assertNull(consumer.receiveImmediate()); + consumer.close(); + session.close(); + + store.getCursorProvider().cleanup(); + waitForNotPaging(server.locateQueue(PagingTest.ADDRESS.concat("=1"))); + sf.close(); + locator.close(); + } finally { + try { + server.stop(); + } catch (Throwable ignored) { + } + } + } + + @Test + public void testStopPagingWithoutMsgsOnOneQueue() throws Exception { + clearDataRecreateServerDirs(); + + Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); + + server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); + + server.start(); + + final int numberOfMessages = 500; + + locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); + + sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(false, false, false); + + session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=1"), SimpleString.toSimpleString("destQ=1"), true); + session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=2"), SimpleString.toSimpleString("destQ=2"), true); + + ClientProducer producer = session.createProducer(PagingTest.ADDRESS); + ClientConsumer consumer1 = session.createConsumer(PagingTest.ADDRESS.concat("=1")); + session.start(); + ClientSession session2 = sf.createSession(false, false, false); + ClientConsumer consumer2 = session2.createConsumer(PagingTest.ADDRESS.concat("=2")); + session2.start(); + + ClientMessage message = null; + + byte[] body = new byte[MESSAGE_SIZE]; + + ByteBuffer bb = ByteBuffer.wrap(body); + + for (int j = 1; j <= MESSAGE_SIZE; j++) { + bb.put(getSamplebyte(j)); + } + + /** + * Here we first send messages and consume them to move every subscription to the next bookmarked page. + * Then we send messages and consume them again, expecting paging is stopped normally. + */ + for (int x = 0; x < 2; x++) { + for (int i = 0; i < numberOfMessages; i++) { + message = session.createMessage(true); + message.putIntProperty("destQ", 1); + ActiveMQBuffer bodyLocal = message.getBodyBuffer(); + bodyLocal.writeBytes(body); + producer.send(message); + if (i % 1000 == 0) { + session.commit(); + } + } + session.commit(); + assertTrue(Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS)); + assertTrue(server.getPagingManager().getPageStore(PagingTest.ADDRESS).isPaging()); + for (int i = 0; i < numberOfMessages; i++) { + ClientMessage msg = consumer1.receive(1000); + assertNotNull(msg); + msg.acknowledge(); + if (i % 500 == 0) { + session.commit(); + } + } + session.commit(); + assertNull(consumer1.receiveImmediate()); + waitForNotPaging(server.locateQueue(PagingTest.ADDRESS.concat("=1"))); + } + + producer.close(); + consumer1.close(); + consumer2.close(); + session.close(); + session2.close(); + sf.close(); + locator.close(); + locator = null; + sf = null; + server.stop(); + } + + @Override protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception { Configuration configuration = super.createDefaultConfig(serverID, netty);