Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8472F1823E for ; Thu, 28 Jan 2016 03:46:52 +0000 (UTC) Received: (qmail 51643 invoked by uid 500); 28 Jan 2016 03:46:52 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 51611 invoked by uid 500); 28 Jan 2016 03:46:52 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 51602 invoked by uid 99); 28 Jan 2016 03:46:52 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Jan 2016 03:46:52 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id D51331A063E for ; Thu, 28 Jan 2016 03:46:51 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.247 X-Spam-Level: * X-Spam-Status: No, score=1.247 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.554, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id A_GWiPhOlL93 for ; Thu, 28 Jan 2016 03:46:43 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 3127F24E24 for ; Thu, 28 Jan 2016 03:46:41 +0000 (UTC) Received: (qmail 51514 invoked by uid 99); 28 Jan 2016 03:46:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Jan 2016 03:46:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3AD1FDFF96; Thu, 28 Jan 2016 03:46:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: boglesby@apache.org To: commits@geode.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: incubator-geode git commit: GEODE-865: CI Failure: ConcurrentParallelGatewaySenderOperation_2_DUnitTest.testParallelGatewaySenders_SingleNode_UserPR_localDestroy_RecreateRegion Date: Thu, 28 Jan 2016 03:46:40 +0000 (UTC) Repository: incubator-geode Updated Branches: refs/heads/wan_cq_donation 09bd53079 -> 68a85fed9 GEODE-865: CI Failure: ConcurrentParallelGatewaySenderOperation_2_DUnitTest.testParallelGatewaySenders_SingleNode_UserPR_localDestroy_RecreateRegion Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/68a85fed Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/68a85fed Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/68a85fed Branch: refs/heads/wan_cq_donation Commit: 68a85fed991d7ebd9519c0f77b5b5b564b965a56 Parents: 09bd530 Author: Barry Oglesby Authored: Tue Jan 26 13:43:21 2016 -0800 Committer: Barry Oglesby Committed: Wed Jan 27 18:02:19 2016 -0800 ---------------------------------------------------------------------- .../gemfire/internal/cache/wan/WANTestBase.java | 14 +- ...allelGatewaySenderOperation_2_DUnitTest.java | 841 +++++-------------- 2 files changed, 234 insertions(+), 621 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/68a85fed/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java index 0c32d1f..3dd4742 100644 --- a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java +++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java @@ -2395,7 +2395,7 @@ public class WANTestBase extends DistributedTestCase{ public static void createConcurrentSender(String dsName, int remoteDsId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, GatewayEventFilter filter, - boolean isManulaStart, int concurrencyLevel, OrderPolicy policy) { + boolean isManualStart, int concurrencyLevel, OrderPolicy policy) { File persistentDirectory = new File(dsName + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); @@ -2408,7 +2408,7 @@ public class WANTestBase extends DistributedTestCase{ gateway.setParallel(true); gateway.setMaximumQueueMemory(maxMemory); gateway.setBatchSize(batchSize); - gateway.setManualStart(isManulaStart); + gateway.setManualStart(isManualStart); ((InternalGatewaySenderFactory) gateway) .setLocatorDiscoveryCallback(new MyLocatorCallback()); if (filter != null) { @@ -2430,7 +2430,7 @@ public class WANTestBase extends DistributedTestCase{ GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); gateway.setMaximumQueueMemory(maxMemory); gateway.setBatchSize(batchSize); - gateway.setManualStart(isManulaStart); + gateway.setManualStart(isManualStart); ((InternalGatewaySenderFactory) gateway) .setLocatorDiscoveryCallback(new MyLocatorCallback()); if (filter != null) { @@ -4825,7 +4825,13 @@ public class WANTestBase extends DistributedTestCase{ } } } - + + protected Integer[] createLNAndNYLocators() { + Integer lnPort = (Integer) vm0.invoke(() -> createFirstLocatorWithDSId(1)); + Integer nyPort = (Integer) vm1.invoke(() -> createFirstRemoteLocator(2, lnPort)); + return new Integer[] { lnPort, nyPort }; + } + public static class MyLocatorCallback extends LocatorDiscoveryCallbackAdapter { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/68a85fed/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java index f770062..694fc1f 100644 --- a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java +++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java @@ -23,6 +23,8 @@ import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; import com.gemstone.gemfire.internal.cache.wan.WANTestBase; import com.gemstone.gemfire.test.dunit.AsyncInvocation; import com.gemstone.gemfire.test.dunit.DistributedTestCase; +import com.gemstone.gemfire.test.dunit.VM; + /** * @author skumar * @@ -46,239 +48,75 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes // to test that when userPR is locally destroyed, shadow Pr is also locally // destroyed and on recreation usrePr , shadow Pr is also recreated. public void testParallelGatewaySender_SingleNode_UserPR_localDestroy_RecreateRegion() throws Exception { - Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, - "createFirstLocatorWithDSId", new Object[] { 1 }); - Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, - "createFirstRemoteLocator", new Object[] { 2, lnPort }); + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; - vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); - try { - vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME", - new Object[] { lnPort }); - - vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { - "ln", 2, true, 100, 10, false, false, null, false, 5, OrderPolicy.KEY }); - - vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); - - vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" }); - - vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", "ln", 1, 10, isOffHeap() }); - - getLogWriter().info("Created PRs on local site"); - - vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", null, 1, 10, isOffHeap() }); - - vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", - 10 }); - - vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 10 }); + createAndStartSender(vm4, lnPort, 5, false, true); - // since sender is paused, no dispatching - vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 0 }); + createReceiverAndDoPutsInPausedSender(nyPort); - vm4.invoke(WANTestBase.class, "localDestroyRegion", - new Object[] { testName + "_PR" }); + vm4.invoke(() -> localDestroyRegion(testName + "_PR")); - // since shodowPR is locally destroyed, so no data to dispatch - vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 0 }); - - vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" }); - - vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", "ln", 1, 10, isOffHeap() }); - - vm4.invoke(WANTestBase.class, "doPutsFrom", new Object[] { - testName + "_PR", 10, 20 }); - - vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 10 }); - - vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 10 }); + recreatePRDoPutsAndValidateRegionSizes(0, true); } finally { - vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME"); + vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); } } public void testParallelGatewaySender_SingleNode_UserPR_Destroy_RecreateRegion() throws Exception { - Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, - "createFirstLocatorWithDSId", new Object[] { 1 }); - Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, - "createFirstRemoteLocator", new Object[] { 2, lnPort }); + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; - vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); try { - vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME", - new Object[] { lnPort }); + createAndStartSender(vm4, lnPort, 4, false, true); - vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { - "ln", 2, true, 100, 10, false, false, null, false, 4, OrderPolicy.KEY }); + createReceiverAndDoPutsInPausedSender(nyPort); - vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm4.invoke(() -> resumeSender("ln")); - vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" }); + vm4.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln")); - vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", "ln", 1, 10, isOffHeap() }); - - getLogWriter().info("Created PRs on local site"); + vm4.invoke(() -> localDestroyRegion(testName + "_PR")); - vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", null, 1, 10, isOffHeap() }); - - vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", - 10 }); - - vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 10 }); - - // since resume is paused, no dispatching - vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 0 }); - - vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" }); + recreatePRDoPutsAndValidateRegionSizes(10, false); + } finally { + vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); + } + } - vm4.invoke(WANTestBase.class, "destroyRegion", new Object[] { testName - + "_PR" }); + public void testParallelGatewaySender_SingleNode_UserPR_Close_RecreateRegion() throws Exception { + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; - // before destoy, there is wait for queue to drain, so data will be - // dispatched - vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 10 }); + createAndStartSender(vm4, lnPort, 7, false, true); - vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", "ln", 1, 10, isOffHeap() }); + createReceiverAndDoPutsInPausedSender(nyPort); - vm4.invoke(WANTestBase.class, "doPutsFrom", new Object[] { - testName + "_PR", 10, 20 }); + vm4.invoke(() -> closeRegion(testName + "_PR")); - vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 10 }); + vm4.invoke(() -> resumeSender("ln")); - vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 20 }); - } finally { - vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME"); - } - } - - - public void testParallelGatewaySender_SingleNode_UserPR_Close_RecreateRegion() throws Exception { - Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, - "createFirstLocatorWithDSId", new Object[] { 1 }); - Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, - "createFirstRemoteLocator", new Object[] { 2, lnPort }); + pause(500); //paused if there is any element which is received on remote site - vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + recreatePRDoPutsAndValidateRegionSizes(0, false); - vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME", new Object[] { lnPort }); - - getLogWriter().info("Created cache on local site"); - - getLogWriter().info("Created senders on local site"); - - vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, - true, 100, 10, false, false, null, false, 7, OrderPolicy.KEY }); - - vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); - - vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" }); - - vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", "ln", 1, 10, isOffHeap()}); - - getLogWriter().info("Created PRs on local site"); - - vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", null, 1, 10, isOffHeap() }); - - vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 10 }); - - vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 10 }); - - //since resume is paused, no dispatching - vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 0 }); - - vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, "closeRegion", new Object[] { testName + "_PR" }); - - vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" }); - - pause(500); //paused if there is any element which is received on remote site - - //before close, there is wait for queue to drain, so data will be dispatched - vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 0 }); - - vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", "ln", 1, 10, isOffHeap() }); - - vm4.invoke(WANTestBase.class, "doPutsFrom", new Object[] { testName + "_PR", 10, 20 }); - - vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 10 }); - - vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 10 }); - - vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME"); + vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); } //to test that while localDestroy is in progress, put operation does not successed public void testParallelGatewaySender_SingleNode_UserPR_localDestroy_SimultenuousPut_RecreateRegion() throws Exception { - Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, - "createFirstLocatorWithDSId", new Object[] { 1 }); - Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, - "createFirstRemoteLocator", new Object[] { 2, lnPort }); - - vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; try { - vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME", - new Object[] { lnPort }); - - getLogWriter().info("Created cache on local site"); + createAndStartSender(vm4, lnPort, 5, false, true); - getLogWriter().info("Created senders on local site"); - - vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { - "ln", 2, true, 100, 10, false, false, null, false, 5, OrderPolicy.KEY }); - - vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); - - vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" }); - - vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", "ln", 1, 10, isOffHeap() }); - - getLogWriter().info("Created PRs on local site"); - - vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", null, 1, 10, isOffHeap() }); - - vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", - 10 }); - - vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 10 }); - - // since resume is paused, no dispatching - vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 0 }); + createReceiverAndDoPutsInPausedSender(nyPort); AsyncInvocation putAsync = vm4.invokeAsync(WANTestBase.class, "doPutsFrom", new Object[] { testName + "_PR", 100, 2000 }); @@ -302,73 +140,30 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes putAsync.getException()); } - vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" }); + vm4.invoke(() -> resumeSender("ln")); pause(500); // paused if there is any element which is received on remote // site - // since shodowPR is locally destroyed, so no data to dispatch - vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 0 }); - - vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", "ln", 1, 10, isOffHeap() }); - - vm4.invoke(WANTestBase.class, "doPutsFrom", new Object[] { - testName + "_PR", 10, 20 }); - - vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 10 }); - - vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 10 }); + recreatePRDoPutsAndValidateRegionSizes(0, false); } finally { - vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME"); + vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); } } public void testParallelGatewaySender_SingleNode_UserPR_Destroy_SimultenuousPut_RecreateRegion() throws Exception { - Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, - "createFirstLocatorWithDSId", new Object[] { 1 }); - Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, - "createFirstRemoteLocator", new Object[] { 2, lnPort }); - - vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; try { - vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME", - new Object[] { lnPort }); - - vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { - "ln", 2, true, 100, 10, false, false, null, false, 6, OrderPolicy.KEY }); - - vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); - - vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" }); - - vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", "ln", 1, 10, isOffHeap() }); - vm4.invoke(WANTestBase.class, "addCacheListenerAndDestroyRegion", new Object[] { - testName + "_PR"}); - - getLogWriter().info("Created PRs on local site"); - - vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", null, 1, 10, isOffHeap() }); - - vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", - 10 }); + createAndStartSender(vm4, lnPort, 6, false, true); - vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 10 }); + vm4.invoke(() -> addCacheListenerAndDestroyRegion(testName + "_PR")); - // since resume is paused, no dispatching - vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 0 }); + createReceiverAndDoPutsInPausedSender(nyPort); - vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" }); + vm4.invoke(() -> resumeSender("ln")); AsyncInvocation putAsync = vm4.invokeAsync(WANTestBase.class, "doPutsFrom", new Object[] { testName + "_PR", 10, 101 }); @@ -387,27 +182,18 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes // before destroy, there is wait for queue to drain, so data will be // dispatched - vm2.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "validateRegionSizeWithinRange", new Object[] { testName + "_PR", 10, - 101 }); // possible size is more than 10 + vm2.invoke(() -> validateRegionSizeWithinRange(testName + "_PR", 10, 101)); // possible size is more than 10 - vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", "ln", 1, 10, isOffHeap() }); + vm4.invoke(() -> createPartitionedRegion(testName + "_PR", "ln", 1, 10, isOffHeap())); - vm4.invoke(WANTestBase.class, "doPutsFrom", new Object[] { - testName + "_PR", 10, 20 }); + vm4.invoke(() -> doPutsFrom(testName + "_PR", 10, 20)); - vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 10 }); + vm4.invoke(() -> validateRegionSize(testName + "_PR", 10)); - vm2.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "validateRegionSizeWithinRange", new Object[] { testName + "_PR", 20, - 101 });// possible size is more than 20 + vm2.invoke(() -> validateRegionSizeWithinRange(testName + "_PR", 20, 101)); // possible size is more than 20 } finally { - vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME"); + vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); } - } public void testParallelGatewaySender_SingleNode_UserPR_Destroy_NodeDown() @@ -415,60 +201,16 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes addExpectedException("Broken pipe"); addExpectedException("Connection reset"); addExpectedException("Unexpected IOException"); - Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, - "createFirstLocatorWithDSId", new Object[] { 1 }); - Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, - "createFirstRemoteLocator", new Object[] { 2, lnPort }); - - vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; try { - vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME", - new Object[] { lnPort }); - vm5.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME", - new Object[] { lnPort }); - vm6.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME", - new Object[] { lnPort }); - - vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { - "ln", 2, true, 100, 10, false, false, null, false, 5, OrderPolicy.KEY }); - vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { - "ln", 2, true, 100, 10, false, false, null, false, 5, OrderPolicy.KEY }); - vm6.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { - "ln", 2, true, 100, 10, false, false, null, false, 5, OrderPolicy.KEY }); - - vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); - vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); - vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); - - vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" }); - vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" }); - vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" }); - - vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", "ln", 1, 10, isOffHeap() }); - vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", "ln", 1, 10, isOffHeap() }); - vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", "ln", 1, 10, isOffHeap() }); - - getLogWriter().info("Created PRs on local site"); - - vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", null, 1, 10, isOffHeap() }); - - vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", - 10000 }); + createAndStartSender(vm4, lnPort, 5, false, true); + createAndStartSender(vm5, lnPort, 5, false, true); + createAndStartSender(vm6, lnPort, 5, false, true); - vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 10000 }); - - // since resume is paused, no dispatching - vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 0 }); + createReceiverAndDoPutsInPausedSender(nyPort); vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" }); vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" }); @@ -488,60 +230,24 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes fail("Interrupted the async invocation."); } - vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 10000 });// possible size is more than 20 + vm2.invoke(() -> validateRegionSize(testName + "_PR", 10)); } finally { - vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME"); - vm5.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME"); - vm6.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME"); + vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); + vm5.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); + vm6.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); } } public void testParallelGatewaySender_SingleNode_UserPR_Close_SimultenuousPut_RecreateRegion() throws Exception { - Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, - "createFirstLocatorWithDSId", new Object[] { 1 }); - Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, - "createFirstRemoteLocator", new Object[] { 2, lnPort }); - - vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; try { - vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME", - new Object[] { lnPort }); - - getLogWriter().info("Created cache on local site"); - - getLogWriter().info("Created senders on local site"); + createAndStartSender(vm4, lnPort, 5, false, true); - vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { - "ln", 2, true, 100, 10, false, false, null, false, 5, OrderPolicy.KEY }); - - vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); - - vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" }); - - vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", "ln", 1, 10, isOffHeap() }); - - getLogWriter().info("Created PRs on local site"); - - vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", null, 1, 10, isOffHeap() }); - - vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", - 10 }); - - vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 10 }); - - // since resume is paused, no dispatching - vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 0 }); + createReceiverAndDoPutsInPausedSender(nyPort); AsyncInvocation putAsync = vm4.invokeAsync(WANTestBase.class, "doPutsFrom", new Object[] { testName + "_PR", 10, 2000 }); @@ -556,140 +262,108 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes fail("Interrupted the async invocation."); } - vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 0 }); - - vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" }); - - vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", "ln", 1, 10, isOffHeap() }); - - vm4.invoke(WANTestBase.class, "doPutsFrom", new Object[] { - testName + "_PR", 10, 20 }); + recreatePRDoPutsAndValidateRegionSizes(0, true); + } finally { + vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); + } + } - vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 10 }); + private void createReceiverAndDoPutsInPausedSender(int port) { + // Note: This is a test-specific method used by several tests to do puts from vm4 to vm2. + String regionName = testName + "_PR"; + vm2.invoke(() -> createReceiver(port)); + vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap())); + vm4.invoke(() -> doPuts(regionName, 10)); + vm4.invoke(() -> validateRegionSize(regionName, 10)); + // since sender is paused, no dispatching + vm2.invoke(() -> validateRegionSize(regionName, 0)); + } - vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 10 }); - } finally { - vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME"); + private void recreatePRDoPutsAndValidateRegionSizes(int expectedRegionSize, boolean resumeSender) { + // Note: This is a test-specific method used by several test to recreate a partitioned region, + // do puts and validate region sizes in vm2 and vm4. + // since shadowPR is locally destroyed, so no data to dispatch + String regionName = testName + "_PR"; + vm2.invoke(() -> validateRegionSize(regionName, expectedRegionSize)); + if (resumeSender) { + vm4.invoke(() -> resumeSender("ln")); } + vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap())); + vm4.invoke(() -> doPutsFrom(regionName, 10, 20)); + validateRegionSizes(regionName, 10, vm4, vm2); } - + public void testParallelGatewaySenders_SingleNode_UserPR_localDestroy_RecreateRegion() throws Exception { - Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, - "createFirstLocatorWithDSId", new Object[] { 1 }); - Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, - "createFirstRemoteLocator", new Object[] { 2, lnPort }); - Integer tkPort = (Integer)vm2.invoke(WANTestBase.class, - "createFirstRemoteLocator", new Object[] { 3, lnPort }); - Integer pnPort = (Integer)vm3.invoke(WANTestBase.class, - "createFirstRemoteLocator", new Object[] { 4, lnPort }); - - vm4.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); - vm5.invoke(WANTestBase.class, "createReceiver", new Object[] { tkPort }); - vm6.invoke(WANTestBase.class, "createReceiver", new Object[] { pnPort }); + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + Integer tkPort = (Integer)vm2.invoke(() -> createFirstRemoteLocator(3, lnPort)); + Integer pnPort = (Integer)vm3.invoke(() -> createFirstRemoteLocator(4, lnPort)); + + vm4.invoke(() -> createReceiver(nyPort)); + vm5.invoke(() -> createReceiver(tkPort)); + vm6.invoke(() -> createReceiver(pnPort)); try { - vm7.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME", - new Object[] { lnPort }); + vm7.invoke(() -> createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(lnPort)); getLogWriter().info("Created cache on local site"); - vm7.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { - "ln1", 2, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY }); + vm7.invoke(() -> createConcurrentSender("ln1", 2, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY)); + vm7.invoke(() -> createConcurrentSender("ln2", 3, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY)); + vm7.invoke(() -> createConcurrentSender("ln3", 4, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY)); - vm7.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { - "ln2", 3, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY }); + vm7.invoke(() -> startSender("ln1")); + vm7.invoke(() -> startSender("ln2")); + vm7.invoke(() -> startSender("ln3")); - vm7.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { - "ln3", 4, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY }); + String regionName = testName + "_PR"; + vm7.invoke(() -> createPartitionedRegion(regionName, "ln1,ln2,ln3", 1, 10, isOffHeap())); - vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln1" }); - vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln2" }); - vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln3" }); + getLogWriter().info("Created PRs on local site"); - vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", "ln1,ln2,ln3", 1, 10, isOffHeap() }); + vm4.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap())); + vm5.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap())); + vm6.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap())); - getLogWriter().info("Created PRs on local site"); + vm7.invoke(() -> doPuts(regionName, 10)); - vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", null, 1, 10, isOffHeap() }); - vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", null, 1, 10, isOffHeap() }); - vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", null, 1, 10, isOffHeap() }); + vm7.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln1")); + vm7.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln2")); + vm7.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln3")); - vm7.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", - 10 }); - pause(1000); - vm7.invoke(WANTestBase.class, "localDestroyRegion", - new Object[] { testName + "_PR" }); + vm7.invoke(() -> localDestroyRegion(regionName)); - vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", "ln1,ln2,ln3", 1, 10, isOffHeap() }); + vm7.invoke(() -> createPartitionedRegion(regionName, "ln1,ln2,ln3", 1, 10, isOffHeap())); - vm7.invoke(WANTestBase.class, "doPutsFrom", new Object[] { - testName + "_PR", 10, 20 }); + vm7.invoke(() -> doPutsFrom(regionName, 10, 20)); - vm7.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 10 }); + vm7.invoke(() -> validateRegionSize(regionName, 10)); - vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 20 }); - vm5.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 20 }); - vm6.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 20 }); + validateRegionSizes(regionName, 20, vm4, vm5, vm6); } finally { - vm7.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME"); + vm7.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); } } public void testParallelGatewaySender_MultipleNode_UserPR_localDestroy_Recreate() throws Exception { - Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, - "createFirstLocatorWithDSId", new Object[] { 1 }); - Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, - "createFirstRemoteLocator", new Object[] { 2, lnPort }); + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; - vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm2.invoke(() -> createReceiver(nyPort)); try { - vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME", - new Object[] { lnPort }); - vm5.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME", - new Object[] { lnPort }); - - vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", "ln", 1, 100, isOffHeap() }); - vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", "ln", 1, 100, isOffHeap() }); - - vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { - "ln", 2, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY }); - vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { - "ln", 2, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY }); - - vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); - vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); - - getLogWriter().info("Created PRs on local site"); + createAndStartSender(vm4, lnPort, 5, true, false); + createAndStartSender(vm5, lnPort, 5, true, false); - vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", null, 1, 100, isOffHeap() }); + String regionName = testName + "_PR"; + vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap())); AsyncInvocation inv1 = vm4.invokeAsync(WANTestBase.class, "doPuts", - new Object[] { testName + "_PR", 1000 }); + new Object[] { regionName, 10 }); pause(1000); - vm5.invoke(WANTestBase.class, "localDestroyRegion", - new Object[] { testName + "_PR" }); + vm5.invoke(() -> localDestroyRegion(regionName)); try { inv1.join(); @@ -698,79 +372,45 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes fail("Interrupted the async invocation."); } - vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 1000 }); - - vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 1000 }); - vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", "ln", 1, 100, isOffHeap() }); + validateRegionSizes(regionName, 10, vm4, vm2); - vm4.invoke(WANTestBase.class, "doPutsFrom", new Object[] { - testName + "_PR", 1000, 2000 }); + vm5.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap())); - vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 2000 }); + vm4.invoke(() -> doPutsFrom(regionName, 10, 20)); - vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 2000 }); + validateRegionSizes(regionName, 20, vm4, vm2); } finally { - vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME"); - vm5.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME"); + vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); + vm5.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); } } public void testParallelGatewaySenders_MultiplNode_UserPR_localDestroy_Recreate() throws Exception { - Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, - "createFirstLocatorWithDSId", new Object[] { 1 }); - Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, - "createFirstRemoteLocator", new Object[] { 2, lnPort }); - Integer tkPort = (Integer)vm2.invoke(WANTestBase.class, - "createFirstRemoteLocator", new Object[] { 3, lnPort }); - - vm6.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); - vm7.invoke(WANTestBase.class, "createReceiver", new Object[] { tkPort }); - + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + Integer tkPort = (Integer)vm2.invoke(() -> createFirstRemoteLocator(3, lnPort)); + + vm6.invoke(() -> createReceiver(nyPort)); + vm7.invoke(() -> createReceiver(tkPort)); + try { - vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME", - new Object[] { lnPort }); - vm5.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME", - new Object[] { lnPort }); - - vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", "ln1,ln2", 1, 100, isOffHeap() }); - vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", "ln1,ln2", 1, 100, isOffHeap() }); - - vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { - "ln1", 2, true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY }); - vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { - "ln2", 3, true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY }); - vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { - "ln1", 2, true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY }); - vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { - "ln2", 3, true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY }); - - vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln1" }); - vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln2" }); - vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln1" }); - vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln2" }); + createAndStartTwoSenders(vm4, lnPort); + createAndStartTwoSenders(vm5, lnPort); + String regionName = testName + "_PR"; vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", null, 1, 100, isOffHeap() }); + regionName, null, 1, 100, isOffHeap() }); vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", null, 1, 100, isOffHeap() }); + regionName, null, 1, 100, isOffHeap() }); AsyncInvocation inv1 = vm4.invokeAsync(WANTestBase.class, "doPuts", - new Object[] { testName + "_PR", 1000 }); + new Object[] { regionName, 10 }); + pause(1000); vm5.invoke(WANTestBase.class, "localDestroyRegion", - new Object[] { testName + "_PR" }); + new Object[] { regionName }); try { inv1.join(); @@ -779,79 +419,52 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes fail("Interrupted the async invocation."); } - vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 1000 }); - - vm6.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 1000 }); - vm7.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 1000 }); + validateRegionSizes(regionName, 10, vm4, vm6, vm7); vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { - testName + "_PR", "ln1,ln2", 1, 100, isOffHeap() }); + regionName, "ln1,ln2", 1, 100, isOffHeap() }); vm4.invoke(WANTestBase.class, "doPutsFrom", new Object[] { - testName + "_PR", 1000, 2000 }); + regionName, 10, 20 }); - vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 2000 }); - - vm6.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 2000 }); - vm7.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - testName + "_PR", 2000 }); + validateRegionSizes(regionName, 20, vm4, vm6, vm7); } finally { - vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME"); - vm5.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME"); + vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); + vm5.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); } } - - public void testParallelGatewaySender_ColocatedPartitionedRegions_localDestroy() throws Exception { - Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, - "createFirstLocatorWithDSId", new Object[] { 1 }); - Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, - "createFirstRemoteLocator", new Object[] { 2, lnPort }); - vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + private void createAndStartTwoSenders(VM vm, int port) { + // Note: This is a test-specific method used to create and start 2 senders. + vm.invoke(() -> createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(port)); + vm.invoke(() -> createPartitionedRegion(testName + "_PR", "ln1,ln2", 1, 100, isOffHeap())); + vm.invoke(() -> createConcurrentSender("ln1", 2, true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY)); + vm.invoke(() -> createConcurrentSender("ln2", 3, true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY)); + vm.invoke(() -> startSender("ln1")); + vm.invoke(() -> startSender("ln2")); + } - try { - vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME", - new Object[] { lnPort }); - vm5.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME", - new Object[] { lnPort }); - - vm4.invoke(WANTestBase.class, - "createCustomerOrderShipmentPartitionedRegion", new Object[] { null, - "ln", 1, 100, isOffHeap() }); - vm5.invoke(WANTestBase.class, - "createCustomerOrderShipmentPartitionedRegion", new Object[] { null, - "ln", 1, 100, isOffHeap() }); + public void testParallelGatewaySender_ColocatedPartitionedRegions_localDestroy() throws Exception { + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; - vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { - "ln", 2, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY }); - vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { - "ln", 2, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY }); + vm2.invoke(() -> createReceiver(nyPort)); - vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); - vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + try { + createAndStartSenderWithCustomerOrderShipmentRegion(vm4, lnPort, 5, true); + createAndStartSenderWithCustomerOrderShipmentRegion(vm5, lnPort, 5, true); getLogWriter().info("Created PRs on local site"); - vm2.invoke(WANTestBase.class, - "createCustomerOrderShipmentPartitionedRegion", new Object[] { null, - null, 1, 100, isOffHeap() }); + vm2.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, null, 1, 100, isOffHeap())); AsyncInvocation inv1 = vm4.invokeAsync(WANTestBase.class, - "putcolocatedPartitionedRegion", new Object[] { 2000 }); + "putcolocatedPartitionedRegion", new Object[] { 10 }); pause(1000); try { - vm5.invoke(WANTestBase.class, "localDestroyRegion", - new Object[] { customerRegionName }); + vm5.invoke(() -> localDestroyRegion(customerRegionName)); } catch (Exception ex) { assertTrue(ex.getCause() instanceof UnsupportedOperationException); } @@ -862,51 +475,29 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes fail("Unexpected exception", e); } - vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - customerRegionName, 2000 }); - vm5.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - customerRegionName, 2000 }); - vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { - customerRegionName, 2000 }); + validateRegionSizes(customerRegionName, 10, vm4, vm5, vm2); } finally { - vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME"); - vm5.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME"); + vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); + vm5.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); } - } - + + private void validateRegionSizes(String regionName, int expectedRegionSize, VM... vms) { + for (VM vm : vms) { + vm.invoke(() -> validateRegionSize(regionName, expectedRegionSize)); + } + } + public void testParallelGatewaySender_ColocatedPartitionedRegions_destroy() throws Exception { - Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, - "createFirstLocatorWithDSId", new Object[] { 1 }); - Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, - "createFirstRemoteLocator", new Object[] { 2, lnPort }); + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); try { - vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME", - new Object[] { lnPort }); - vm5.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME", - new Object[] { lnPort }); - - vm4.invoke(WANTestBase.class, - "createCustomerOrderShipmentPartitionedRegion", new Object[] { null, - "ln", 1, 100, isOffHeap() }); - vm5.invoke(WANTestBase.class, - "createCustomerOrderShipmentPartitionedRegion", new Object[] { null, - "ln", 1, 100, isOffHeap() }); - - vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { - "ln", 2, true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY }); - vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { - "ln", 2, true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY }); - - vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); - vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + createAndStartSenderWithCustomerOrderShipmentRegion(vm4, lnPort, 6, true); + createAndStartSenderWithCustomerOrderShipmentRegion(vm5, lnPort, 6, true); getLogWriter().info("Created PRs on local site"); @@ -927,13 +518,29 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes } fail("Excpeted UnsupportedOperationException"); } finally { - vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME"); - vm5.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, - "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME"); + vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); + vm5.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); } } - + + private void createAndStartSenderWithCustomerOrderShipmentRegion(VM vm, int port, int concurrencyLevel, boolean manualStart) { + vm.invoke(() -> createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(port)); + vm.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, "ln", 1, 100, isOffHeap())); + vm.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, false, null, manualStart, concurrencyLevel, OrderPolicy.KEY)); + vm.invoke(() -> startSender("ln")); + } + + private void createAndStartSender(VM vm, int port, int concurrencyLevel, boolean manualStart, boolean pause) { + vm.invoke(() -> createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(port)); + vm.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, false, null, manualStart, concurrencyLevel, OrderPolicy.KEY)); + vm.invoke(() -> startSender("ln")); + if (pause) { + vm.invoke(() -> pauseSender("ln")); + } + vm.invoke(() -> createPartitionedRegion(testName + "_PR", "ln", 1, 10, isOffHeap())); + getLogWriter().info("Created PRs on local site"); + } + public static void createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME( Integer locPort) { createCache(false, locPort);