Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AC76A1862F for ; Fri, 1 Apr 2016 20:07:58 +0000 (UTC) Received: (qmail 19080 invoked by uid 500); 1 Apr 2016 20:07:58 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 19048 invoked by uid 500); 1 Apr 2016 20:07:58 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 19039 invoked by uid 99); 1 Apr 2016 20:07:58 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Apr 2016 20:07:58 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id B557CC0225 for ; Fri, 1 Apr 2016 20:07:57 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 9x9SYfSm--sd for ; Fri, 1 Apr 2016 20:07:45 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 592ED5FBC4 for ; Fri, 1 Apr 2016 20:07:43 +0000 (UTC) Received: (qmail 17568 invoked by uid 99); 1 Apr 2016 20:07:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Apr 2016 20:07:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6567EDFFF8; Fri, 1 Apr 2016 20:07:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jensdeppe@apache.org To: commits@geode.incubator.apache.org Date: Fri, 01 Apr 2016 20:07:54 -0000 Message-Id: In-Reply-To: <0c3d6f292b394c59b3541b6b465c7883@git.apache.org> References: <0c3d6f292b394c59b3541b6b465c7883@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [14/18] incubator-geode git commit: GEODE-1062: Refactor of WANTestBase http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java index d7e0d4f..4669ac9 100644 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java @@ -53,13 +53,10 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION )); @@ -79,10 +76,7 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { vm7.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5, vm6, vm7); vm2.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); @@ -139,13 +133,10 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION )); @@ -165,10 +156,7 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { vm7.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5, vm6, vm7); vm2.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); @@ -204,13 +192,14 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION )); @@ -230,15 +219,7 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { vm7.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + startSenderInVMs("ln", vm4, vm5, vm6, vm7); //before doing any puts, let the senders be running in order to ensure that //not a single event will be lost @@ -269,11 +250,8 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { public void testParallelPropagation_withoutRemoteSite() throws Exception { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); //keep a larger batch to minimize number of exception occurrences in the log vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, @@ -294,16 +272,8 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { vm7.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5, vm6, vm7); - vm4.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", false )); - vm5.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", false )); - vm6.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", false )); - vm7.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", false )); - //make sure all the senders are running before doing any puts vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); @@ -313,15 +283,14 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); - + createCacheInVMs(nyPort, vm2, vm3); vm2.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); vm3.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - + + createReceiverInVMs(nyPort, vm2, vm3); + //verify all buckets drained on all sender nodes. vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); @@ -343,13 +312,10 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION )); @@ -369,12 +335,9 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { "ln", 1, 100, isOffHeap() )); vm7.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion( null, "ln", 1, 100, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); vm2.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion( null, "ln", 1, 100, isOffHeap() )); @@ -417,13 +380,10 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION )); @@ -443,10 +403,7 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { vm7.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5, vm6, vm7); vm2.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); @@ -488,11 +445,6 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION )); vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION )); - - vm4.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true )); - vm5.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true )); - vm6.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true )); - vm7.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true )); vm4.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); @@ -503,10 +455,7 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { vm7.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); @@ -541,13 +490,10 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY )); @@ -563,10 +509,7 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { vm6.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); vm7.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5, vm6, vm7); vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap() )); vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap() )); @@ -593,13 +536,10 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION )); @@ -615,10 +555,7 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { vm6.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); vm7.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5, vm6, vm7); vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap() )); vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap() )); @@ -640,13 +577,10 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY )); @@ -656,11 +590,6 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY )); vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY )); - - vm4.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true )); - vm5.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true )); - vm6.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true )); - vm7.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true )); vm4.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() )); @@ -671,11 +600,8 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { vm7.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); - + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + vm2.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); vm3.invoke(() -> WANTestBase.createPartitionedRegion( @@ -711,10 +637,10 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + createCacheInVMs(nyPort, vm2); vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createCache( lnPort )); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm3, vm4); vm3.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY )); @@ -728,10 +654,9 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap())); vm4.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap())); - - vm3.invoke(() -> WANTestBase.startSender( "ln" )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - + + startSenderInVMs("ln", vm3, vm4); + vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_PR", 10 )); @@ -764,17 +689,8 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_PR", 10 )); - AsyncInvocation inv1 = vm3.invokeAsync(() -> WANTestBase.startSender( "ln" )); - AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.startSender( "ln" )); - - try{ - inv1.join(); - inv2.join(); - } - catch(InterruptedException ie) { - fail("Caught interrupted exception"); - } - + startSenderInVMsAsync("ln", vm3, vm4); + vm4.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_PR", 40 )); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java index 8c4f142..ddcb3d6 100644 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java @@ -44,13 +44,10 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY )); @@ -93,13 +90,10 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY )); @@ -125,13 +119,8 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); //start the senders - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); - - Wait.pause(2000); - + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); @@ -153,13 +142,10 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY )); @@ -179,10 +165,7 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes vm7.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5, vm6, vm7); vm2.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); @@ -219,13 +202,10 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, true, 100, 10, false, false, null, true, 8, OrderPolicy.KEY )); @@ -245,10 +225,7 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes vm7.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5, vm6, vm7); vm2.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); @@ -302,9 +279,10 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); + createCacheInVMs(nyPort, vm4, vm5); vm4.invoke(() -> WANTestBase.createCache( lnPort )); vm5.invoke(() -> WANTestBase.createCache( lnPort )); @@ -322,9 +300,8 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); vm3.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); + + startSenderInVMs("ln", vm4, vm5); //wait till the senders are running vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); @@ -360,13 +337,14 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, true, 100, 10, false, false, null, true, 3, OrderPolicy.KEY )); @@ -386,15 +364,7 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes vm7.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + startSenderInVMs("ln", vm4, vm5, vm6, vm7); //make sure all the senders are running before doing any puts vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); @@ -425,13 +395,14 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); @@ -451,15 +422,8 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes vm7.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5, vm6, vm7); - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); //make sure all the senders are running before doing any puts vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); @@ -527,13 +491,14 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY )); @@ -553,15 +518,7 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes vm7.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + startSenderInVMs("ln", vm4, vm5, vm6, vm7); //make sure all the senders are running before doing any puts vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); @@ -598,10 +555,7 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes LogWriterUtils.getLogWriter().info("Starting the senders at the same time"); //when puts are happening by another thread, start the senders - vm4.invokeAsync(() -> WANTestBase.startSender( "ln" )); - vm5.invokeAsync(() -> WANTestBase.startSender( "ln" )); - vm6.invokeAsync(() -> WANTestBase.startSender( "ln" )); - vm7.invokeAsync(() -> WANTestBase.startSender( "ln" )); + startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7); LogWriterUtils.getLogWriter().info("All the senders are started"); @@ -627,13 +581,10 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); @@ -653,10 +604,7 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes vm7.invoke(() -> WANTestBase.createPartitionedRegionAsAccessor( getTestMethodName() + "_PR", "ln", 1, 100)); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5, vm6, vm7); vm2.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); @@ -723,13 +671,10 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); LogWriterUtils.getLogWriter().info("Created cache on local site"); @@ -768,11 +713,8 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes //During sender start, it will wait until those buckets are created for shadowPR as well. //Start the senders in async threads, so colocation of shadowPR will be complete and //missing buckets will be created in PRHARedundancyProvider.createMissingBuckets(). - vm4.invokeAsync(() -> WANTestBase.startSender( "ln" )); - vm5.invokeAsync(() -> WANTestBase.startSender( "ln" )); - vm6.invokeAsync(() -> WANTestBase.startSender( "ln" )); - vm7.invokeAsync(() -> WANTestBase.startSender( "ln" )); - + startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7); + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java index b4fa925..552da9e 100644 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java @@ -228,8 +228,11 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes Integer tkPort = (Integer)vm2.invoke(() -> createFirstRemoteLocator(3, lnPort)); Integer pnPort = (Integer)vm3.invoke(() -> createFirstRemoteLocator(4, lnPort)); + createCacheInVMs(nyPort, vm4); vm4.invoke(() -> createReceiver(nyPort)); + createCacheInVMs(tkPort, vm5); vm5.invoke(() -> createReceiver(tkPort)); + createCacheInVMs(pnPort, vm6); vm6.invoke(() -> createReceiver(pnPort)); try { @@ -279,6 +282,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes Integer lnPort = locatorPorts[0]; Integer nyPort = locatorPorts[1]; + createCacheInVMs(nyPort, vm2); vm2.invoke(() -> createReceiver(nyPort)); try { @@ -319,7 +323,9 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes Integer nyPort = locatorPorts[1]; Integer tkPort = (Integer)vm2.invoke(() -> createFirstRemoteLocator(3, lnPort)); + createCacheInVMs(nyPort, vm6); vm6.invoke(() -> createReceiver(nyPort)); + createCacheInVMs(tkPort, vm7); vm7.invoke(() -> createReceiver(tkPort)); try { @@ -364,6 +370,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes Integer lnPort = locatorPorts[0]; Integer nyPort = locatorPorts[1]; + createCacheInVMs(nyPort, vm2); vm2.invoke(() -> createReceiver(nyPort)); try { @@ -401,6 +408,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes Integer lnPort = locatorPorts[0]; Integer nyPort = locatorPorts[1]; + createCacheInVMs(nyPort, vm2); vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); try { @@ -478,6 +486,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes protected void createReceiverAndDoPutsInPausedSender(int port) { // Note: This is a test-specific method used by several tests to do puts from vm4 to vm2. String regionName = getTestMethodName() + "_PR"; + createCacheInVMs(port, vm2); vm2.invoke(() -> createReceiver(port)); vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap())); vm4.invoke(() -> doPuts(regionName, 10)); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_1_DUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_1_DUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_1_DUnitTest.java index 6cd1ae3..b95437c 100644 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_1_DUnitTest.java +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_1_DUnitTest.java @@ -52,11 +52,7 @@ public class ConcurrentWANPropogation_1_DUnitTest extends WANTestBase { Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - vm4.invoke(() -> WANTestBase.createCache(lnPort )); - vm5.invoke(() -> WANTestBase.createCache(lnPort )); - vm6.invoke(() -> WANTestBase.createCache(lnPort )); - vm7.invoke(() -> WANTestBase.createCache(lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); //keep the batch size high enough to reduce the number of exceptions in the log vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, @@ -64,8 +60,7 @@ public class ConcurrentWANPropogation_1_DUnitTest extends WANTestBase { vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, false, 100, 400, false, false, null, true, 4, OrderPolicy.KEY )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5); vm4.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() )); @@ -81,9 +76,8 @@ public class ConcurrentWANPropogation_1_DUnitTest extends WANTestBase { vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 1000 )); - - vm2.invoke(() -> WANTestBase.createCache( nyPort )); - vm3.invoke(() -> WANTestBase.createCache( nyPort )); + + createCacheInVMs(nyPort, vm2, vm3); vm2.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", null, isOffHeap() )); @@ -105,13 +99,10 @@ public class ConcurrentWANPropogation_1_DUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); @@ -123,8 +114,7 @@ public class ConcurrentWANPropogation_1_DUnitTest extends WANTestBase { vm3.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", null, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5); vm4.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() )); @@ -152,13 +142,10 @@ public class ConcurrentWANPropogation_1_DUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); @@ -170,8 +157,7 @@ public class ConcurrentWANPropogation_1_DUnitTest extends WANTestBase { vm3.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", null, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5); vm4.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() )); @@ -194,19 +180,13 @@ public class ConcurrentWANPropogation_1_DUnitTest extends WANTestBase { Integer regionSize = (Integer) vm2.invoke(() -> WANTestBase.getRegionSize(getTestMethodName() + "_RR" )); LogWriterUtils.getLogWriter().info("Region size on remote is: " + regionSize); - - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - - vm4.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true )); - vm5.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true )); vm4.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() )); @@ -216,10 +196,9 @@ public class ConcurrentWANPropogation_1_DUnitTest extends WANTestBase { getTestMethodName() + "_RR", "ln", isOffHeap() )); vm7.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - + + startSenderInVMs("ln", vm4, vm5); + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); @@ -250,14 +229,10 @@ public class ConcurrentWANPropogation_1_DUnitTest extends WANTestBase { Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); //these are part of remote site - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); - + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); //these are part of local site - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); //senders are created on local site vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, @@ -278,8 +253,7 @@ public class ConcurrentWANPropogation_1_DUnitTest extends WANTestBase { getTestMethodName() + "_RR_2", null, isOffHeap() )); //start the senders on local site - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5); //create one RR (RR_1) on local site vm4.invoke(() -> WANTestBase.createReplicatedRegion( @@ -335,14 +309,12 @@ public class ConcurrentWANPropogation_1_DUnitTest extends WANTestBase { Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); //these are part of remote site - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); + //these are part of local site - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); //senders are created on local site vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, @@ -357,8 +329,7 @@ public class ConcurrentWANPropogation_1_DUnitTest extends WANTestBase { getTestMethodName() + "_RR_1", null, isOffHeap() )); //start the senders on local site - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5); //create one RR (RR_1) on local site vm4.invoke(() -> WANTestBase.createReplicatedRegion( @@ -407,14 +378,11 @@ public class ConcurrentWANPropogation_1_DUnitTest extends WANTestBase { Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); //these are part of remote site - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); //these are part of local site - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); //senders are created on local site vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, @@ -435,8 +403,7 @@ public class ConcurrentWANPropogation_1_DUnitTest extends WANTestBase { getTestMethodName() + "_RR_2", null, isOffHeap() )); //start the senders on local site - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5); //create one RR (RR_1) on local site vm4.invoke(() -> WANTestBase.createReplicatedRegion( @@ -485,22 +452,20 @@ public class ConcurrentWANPropogation_1_DUnitTest extends WANTestBase { public void testReplicatedSerialPropagationWithRemoteRegionDestroy3() throws Exception { + final String senderId = "ln"; Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); // these are part of remote site - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); // these are part of local site - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); // senders are created on local site - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + vm4.invoke(() -> WANTestBase.createConcurrentSender( senderId, 2, false, 100, 200, false, false, null, true, 5, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + vm5.invoke(() -> WANTestBase.createConcurrentSender( senderId, 2, false, 100, 200, false, false, null, true, 5, OrderPolicy.THREAD )); // create one RR (RR_1) on remote site @@ -515,33 +480,29 @@ public class ConcurrentWANPropogation_1_DUnitTest extends WANTestBase { vm3.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR_2", null, isOffHeap() )); - // start the senders on local site - vm4.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true )); - vm5.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true )); // start the senders on local site - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5); // create one RR (RR_1) on local site vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + getTestMethodName() + "_RR_1", senderId, isOffHeap() )); vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + getTestMethodName() + "_RR_1", senderId, isOffHeap() )); vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + getTestMethodName() + "_RR_1", senderId, isOffHeap() )); vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + getTestMethodName() + "_RR_1", senderId, isOffHeap() )); // create another RR (RR_2) on local site vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); + getTestMethodName() + "_RR_2", senderId, isOffHeap() )); vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); + getTestMethodName() + "_RR_2", senderId, isOffHeap() )); vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); + getTestMethodName() + "_RR_2", senderId, isOffHeap() )); vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); + getTestMethodName() + "_RR_2", senderId, isOffHeap() )); IgnoredException.addIgnoredException(BatchException70.class.getName()); IgnoredException.addIgnoredException(ServerOperationException.class.getName()); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_2_DUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_2_DUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_2_DUnitTest.java index b306fab..a9b4b9d 100644 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_2_DUnitTest.java +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_2_DUnitTest.java @@ -48,13 +48,10 @@ public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver(nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver(nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache(lnPort )); - vm5.invoke(() -> WANTestBase.createCache(lnPort )); - vm6.invoke(() -> WANTestBase.createCache(lnPort )); - vm7.invoke(() -> WANTestBase.createCache(lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); //keep the maxQueueMemory low enough to trigger eviction vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, @@ -67,8 +64,7 @@ public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase { vm3.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", null, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5); vm4.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() )); @@ -93,13 +89,10 @@ public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache(lnPort )); - vm5.invoke(() -> WANTestBase.createCache(lnPort )); - vm6.invoke(() -> WANTestBase.createCache(lnPort )); - vm7.invoke(() -> WANTestBase.createCache(lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, false, 100, 10, false, true, null, true, 5, OrderPolicy.THREAD )); @@ -111,8 +104,7 @@ public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase { vm3.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", null, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5); vm4.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() )); @@ -139,13 +131,12 @@ public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase { Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort )); + createCacheInVMs(nyPort, vm2); + createCacheInVMs(tkPort, vm3); vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); vm3.invoke(() -> WANTestBase.createReceiver( tkPort )); - vm4.invoke(() -> WANTestBase.createCache(lnPort )); - vm5.invoke(() -> WANTestBase.createCache(lnPort )); - vm6.invoke(() -> WANTestBase.createCache(lnPort )); - vm7.invoke(() -> WANTestBase.createCache(lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createConcurrentSender( "lnSerial1", 2, false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); @@ -162,11 +153,9 @@ public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase { vm3.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", null, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "lnSerial1" )); - vm5.invoke(() -> WANTestBase.startSender( "lnSerial1" )); + startSenderInVMs("lnSerial1", vm4, vm5); - vm4.invoke(() -> WANTestBase.startSender( "lnSerial2" )); - vm5.invoke(() -> WANTestBase.startSender( "lnSerial2" )); + startSenderInVMs("lnSerial2", vm4, vm5); vm4.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); @@ -194,13 +183,10 @@ public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache(lnPort )); - vm5.invoke(() -> WANTestBase.createCache(lnPort )); - vm6.invoke(() -> WANTestBase.createCache(lnPort )); - vm7.invoke(() -> WANTestBase.createCache(lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); @@ -212,8 +198,7 @@ public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase { vm3.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", null, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5); vm4.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() )); @@ -242,13 +227,10 @@ public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache(lnPort )); - vm5.invoke(() -> WANTestBase.createCache(lnPort )); - vm6.invoke(() -> WANTestBase.createCache(lnPort )); - vm7.invoke(() -> WANTestBase.createCache(lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, false, 100, 1000, true, false, null, true, 5, OrderPolicy.THREAD )); @@ -260,8 +242,7 @@ public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase { vm3.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", null, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5); vm4.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() )); @@ -287,13 +268,10 @@ public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache(lnPort )); - vm5.invoke(() -> WANTestBase.createCache(lnPort )); - vm6.invoke(() -> WANTestBase.createCache(lnPort )); - vm7.invoke(() -> WANTestBase.createCache(lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, false, 100, 10, false, false, null, true, 4, OrderPolicy.THREAD )); @@ -305,8 +283,7 @@ public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase { vm3.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", null, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5); vm4.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() )); @@ -331,13 +308,10 @@ public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache(lnPort )); - vm5.invoke(() -> WANTestBase.createCache(lnPort )); - vm6.invoke(() -> WANTestBase.createCache(lnPort )); - vm7.invoke(() -> WANTestBase.createCache(lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, false, 100, 10, false, false, @@ -355,8 +329,7 @@ public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase { vm7.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5); vm2.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName(), null, 1, 100, isOffHeap() )); @@ -374,13 +347,14 @@ public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName(), null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName(), null, isOffHeap() )); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache(lnPort )); - vm5.invoke(() -> WANTestBase.createCache(lnPort )); - vm6.invoke(() -> WANTestBase.createCache(lnPort )); - vm7.invoke(() -> WANTestBase.createCache(lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, false, 100, 10, false, false, @@ -389,13 +363,7 @@ public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase { false, 100, 10, false, false, new MyGatewayEventFilter(), true, 5, OrderPolicy.THREAD )); - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName(), null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName(), null, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5); vm4.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName(), "ln", isOffHeap() )); @@ -418,22 +386,20 @@ public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + vm2.invoke(() -> WANTestBase.createCache(nyPort)); + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); + WANTestBase.createCacheInVMs(lnPort, vm4, vm5); vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - Wait.pause(500); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5); vm4.invoke(() -> WANTestBase.createNormalRegion( getTestMethodName() + "_RR", "ln" )); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/disttx/DistTXWANDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/disttx/DistTXWANDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/disttx/DistTXWANDUnitTest.java index cfa2dc0..3de19e2 100644 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/disttx/DistTXWANDUnitTest.java +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/disttx/DistTXWANDUnitTest.java @@ -55,13 +55,10 @@ public class DistTXWANDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 10, false, false, null, true )); @@ -77,8 +74,7 @@ public class DistTXWANDUnitTest extends WANTestBase { vm7.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5); vm2.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); @@ -98,13 +94,10 @@ public class DistTXWANDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 10, false, false, null, true )); @@ -120,8 +113,7 @@ public class DistTXWANDUnitTest extends WANTestBase { vm7.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5); vm2.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); @@ -140,13 +132,10 @@ public class DistTXWANDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 10, false, false, null, true )); @@ -166,10 +155,7 @@ public class DistTXWANDUnitTest extends WANTestBase { vm7.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5, vm6, vm7); vm2.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java index be5bea7..cba42df 100644 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java @@ -80,13 +80,10 @@ public class CommonParallelGatewaySenderDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 10, false, false, null, true )); @@ -115,10 +112,7 @@ public class CommonParallelGatewaySenderDUnitTest extends WANTestBase { vm7.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5, vm6, vm7); vm2.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR1", null, 1, 100, isOffHeap() )); @@ -166,13 +160,10 @@ public class CommonParallelGatewaySenderDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 10, false, true, null, true )); @@ -201,10 +192,7 @@ public class CommonParallelGatewaySenderDUnitTest extends WANTestBase { vm7.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); + startSenderInVMs("ln", vm4, vm5, vm6, vm7); vm2.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR1", null, 1, 100, isOffHeap() )); @@ -260,14 +248,11 @@ public class CommonParallelGatewaySenderDUnitTest extends WANTestBase { Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); //create receiver on remote site - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(nyPort, vm2, vm3); //create cache in local site - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); //create senders with disk store String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true )); @@ -311,11 +296,8 @@ public class CommonParallelGatewaySenderDUnitTest extends WANTestBase { //start the senders on local site - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); - + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + //wait for senders to become running vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); @@ -400,11 +382,8 @@ public class CommonParallelGatewaySenderDUnitTest extends WANTestBase { //start the senders in async mode. This will ensure that the //node of shadow PR that went down last will come up first - vm4.invokeAsync(() -> WANTestBase.startSender( "ln" )); - vm5.invokeAsync(() -> WANTestBase.startSender( "ln" )); - vm6.invokeAsync(() -> WANTestBase.startSender( "ln" )); - vm7.invokeAsync(() -> WANTestBase.startSender( "ln" )); - + startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7); + LogWriterUtils.getLogWriter().info("Waiting for senders running."); //wait for senders running vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java index 0d7216d..680f6bc 100644 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java @@ -60,14 +60,17 @@ public class NewWANConcurrencyCheckForDestroyDUnitTest extends WANTestBase { // Site 1 Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + createCacheInVMs(lnPort, vm1); Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver( lnPort )); //Site 2 Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + createCacheInVMs(nyPort, vm3); Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); //Site 3 Integer tkPort = (Integer)vm4.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort )); + createCacheInVMs(tkPort, vm5); Integer tkRecPort = (Integer) vm5.invoke(() -> WANTestBase.createReceiver( tkPort )); LogWriterUtils.getLogWriter().info("Created locators and receivers in 3 distributed systems"); @@ -141,10 +144,12 @@ public class NewWANConcurrencyCheckForDestroyDUnitTest extends WANTestBase { // Site 1 Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + vm1.invoke(() -> WANTestBase.createCache(lnPort)); Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver( lnPort )); //Site 2 Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + vm3.invoke(() -> WANTestBase.createCache(nyPort)); Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); LogWriterUtils.getLogWriter().info("Created locators and receivers in 2 distributed systems"); @@ -244,10 +249,12 @@ public void testPutAllEventSequenceOnSerialGatewaySenderWithPR() { // Site 1 Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + createCacheInVMs(lnPort, vm1); Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver( lnPort )); //Site 2 Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + createCacheInVMs(nyPort, vm3); Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); LogWriterUtils.getLogWriter().info("Created locators and receivers in 2 distributed systems"); @@ -349,10 +356,12 @@ public void testPutAllEventSequenceOnSerialGatewaySenderWithPR() { // Site 1 Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + createCacheInVMs(lnPort, vm1); Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver( lnPort )); - + //Site 2 Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + createCacheInVMs(nyPort, vm3); Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); LogWriterUtils.getLogWriter().info("Created locators and receivers in 2 distributed systems");