Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 99895200B78 for ; Thu, 18 Aug 2016 23:18:33 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 98107160A86; Thu, 18 Aug 2016 21:18:33 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 42BDF160AAE for ; Thu, 18 Aug 2016 23:18:31 +0200 (CEST) Received: (qmail 69951 invoked by uid 500); 18 Aug 2016 21:18:25 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 69942 invoked by uid 99); 18 Aug 2016 21:18:25 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Aug 2016 21:18:25 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id D9D5C180698 for ; Thu, 18 Aug 2016 21:18:24 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id rH-N-n7x_udU for ; Thu, 18 Aug 2016 21:18:06 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id A9E155FBD0 for ; Thu, 18 Aug 2016 21:18:02 +0000 (UTC) Received: (qmail 66070 invoked by uid 99); 18 Aug 2016 21:18:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Aug 2016 21:18:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9DA2EE78A1; Thu, 18 Aug 2016 21:18:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: udo@apache.org To: commits@geode.incubator.apache.org Date: Thu, 18 Aug 2016 21:18:06 -0000 Message-Id: <23d0574781244201a875161257fa645e@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [06/50] [abbrv] incubator-geode git commit: GEODE-1241: Fixed the misspelt names in Geode WAN module archived-at: Thu, 18 Aug 2016 21:18:33 -0000 http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f37769c/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java new file mode 100644 index 0000000..0cb60be --- /dev/null +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java @@ -0,0 +1,1336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.serial; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.Map; + +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.cache.CacheException; +import com.gemstone.gemfire.cache.EntryExistsException; +import com.gemstone.gemfire.cache.client.ServerOperationException; +import com.gemstone.gemfire.cache30.CacheSerializableRunnable; +import com.gemstone.gemfire.distributed.internal.DistributionConfig; +import com.gemstone.gemfire.internal.cache.wan.BatchException70; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.AsyncInvocation; +import com.gemstone.gemfire.test.dunit.IgnoredException; +import com.gemstone.gemfire.test.dunit.LogWriterUtils; +import com.gemstone.gemfire.test.dunit.SerializableRunnableIF; +import com.gemstone.gemfire.test.dunit.Wait; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; +import com.gemstone.gemfire.test.junit.categories.FlakyTest; + +@Category(DistributedTest.class) +public class SerialWANPropagationDUnitTest extends WANTestBase { + + @Override + public final void postSetUpWANTestBase() throws Exception { + IgnoredException.addIgnoredException("Connection reset"); + IgnoredException.addIgnoredException("Broken pipe"); + IgnoredException.addIgnoredException("Connection refused"); + IgnoredException.addIgnoredException("could not get remote locator information"); + IgnoredException.addIgnoredException("Unexpected IOException"); + } + + /** + * this test is disabled due to a high rate of failure in unit test runs + * see ticket #52190 + */ + @Ignore("TODO: test is disabled because of #52190") + @Test + public void testReplicatedSerialPropagation_withoutRemoteLocator() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + //keep the batch size high enough to reduce the number of exceptions in the log + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 400, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 400, false, false, null, true )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(createReplicatedRegionRunnable()); + vm5.invoke(createReplicatedRegionRunnable()); + vm6.invoke(createReplicatedRegionRunnable()); + vm7.invoke(createReplicatedRegionRunnable()); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + + vm2.invoke(() -> WANTestBase.createReceiver()); + vm3.invoke(() -> WANTestBase.createReceiver()); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + } + + protected SerializableRunnableIF createReplicatedRegionRunnable() { + return () -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() ); + } + + @Test + public void testReplicatedSerialPropagation_withoutRemoteSite() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + //keep the batch size high enough to reduce the number of exceptions in the log + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 400, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 400, false, false, null, true )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(createReplicatedRegionRunnable()); + vm5.invoke(createReplicatedRegionRunnable()); + vm6.invoke(createReplicatedRegionRunnable()); + vm7.invoke(createReplicatedRegionRunnable()); + + IgnoredException.addIgnoredException(BatchException70.class.getName()); + IgnoredException.addIgnoredException(ServerOperationException.class.getName()); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + createCacheInVMs(nyPort, vm2, vm3); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + vm2.invoke(() -> WANTestBase.createReceiver()); + vm3.invoke(() -> WANTestBase.createReceiver()); + + vm4.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + } + + /** + * Added to reproduce the bug #46595 + */ + @Test + public void testReplicatedSerialPropagationWithoutRemoteSite_defect46595() + throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + // reduce the batch-size so maximum number of batches will be sent + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 5, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 5, false, false, null, true )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(createReplicatedRegionRunnable()); + vm5.invoke(createReplicatedRegionRunnable()); + vm6.invoke(createReplicatedRegionRunnable()); + vm7.invoke(createReplicatedRegionRunnable()); + + IgnoredException.addIgnoredException(BatchException70.class.getName()); + IgnoredException.addIgnoredException(ServerOperationException.class.getName()); + IgnoredException.addIgnoredException(IOException.class.getName()); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 10000 )); + + // pause for some time before starting up the remote site + Wait.pause(10000); + + createCacheInVMs(nyPort, vm2, vm3); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + vm2.invoke(() -> WANTestBase.createReceiver()); + vm3.invoke(() -> WANTestBase.createReceiver()); + + vm4.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 10000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 10000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 10000 )); + } + + @Test + public void testReplicatedSerialPropagation() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(createReplicatedRegionRunnable()); + vm5.invoke(createReplicatedRegionRunnable()); + vm6.invoke(createReplicatedRegionRunnable()); + vm7.invoke(createReplicatedRegionRunnable()); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + } + + @Test + public void testReplicatedSerialPropagationWithLocalSiteClosedAndRebuilt() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(createReplicatedRegionRunnable()); + vm5.invoke(createReplicatedRegionRunnable()); + vm6.invoke(createReplicatedRegionRunnable()); + vm7.invoke(createReplicatedRegionRunnable()); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + //---------close local site and build again----------------------------------------- + vm4.invoke(() -> WANTestBase.killSender( )); + vm5.invoke(() -> WANTestBase.killSender( )); + vm6.invoke(() -> WANTestBase.killSender( )); + vm7.invoke(() -> WANTestBase.killSender( )); + + Integer regionSize = + (Integer) vm2.invoke(() -> WANTestBase.getRegionSize(getTestMethodName() + "_RR" )); + LogWriterUtils.getLogWriter().info("Region size on remote is: " + regionSize); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm4.invoke(createReplicatedRegionRunnable()); + vm5.invoke(createReplicatedRegionRunnable()); + vm6.invoke(createReplicatedRegionRunnable()); + vm7.invoke(createReplicatedRegionRunnable()); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + IgnoredException.addIgnoredException(EntryExistsException.class.getName()); + IgnoredException.addIgnoredException(BatchException70.class.getName()); + IgnoredException.addIgnoredException(ServerOperationException.class.getName()); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + //---------------------------------------------------------------------------------- + + //verify remote site receives all the events + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + } + + /** + * Two regions configured with the same sender and put is in progress + * on both the regions. + * One of the two regions is destroyed in the middle. + */ + @Test + public void testReplicatedSerialPropagationWithLocalRegionDestroy() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + //these are part of remote site + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + //these are part of local site + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + //senders are created on local site + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 20, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 20, false, false, null, true )); + + //create one RR (RR_1) on remote site + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", null, isOffHeap() )); + + //create another RR (RR_2) on remote site + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", null, isOffHeap() )); + + //start the senders on local site + startSenderInVMs("ln", vm4, vm5); + + //create one RR (RR_1) on local site + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + + //create another RR (RR_2) on local site + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", "ln", isOffHeap() )); + + //start puts in RR_1 in another thread + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 1000 )); + //do puts in RR_2 in main thread + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_2", 500 )); + //destroy RR_2 after above puts are complete + vm4.invoke(() -> WANTestBase.destroyRegion( getTestMethodName() + "_RR_2")); + + inv1.join(); + + //vm4.invoke(() -> WANTestBase.verifyQueueSize( "ln", 0 )); + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR_1", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR_2", 500 )); + } + + /** + * 1 region and sender configured on local site and 1 region and a + * receiver configured on remote site. Puts to the local region are in progress. + * Remote region is destroyed in the middle. + */ + @Test + public void testReplicatedSerialPropagationWithRemoteRegionDestroy() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + //these are part of remote site + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + //these are part of local site + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + //senders are created on local site + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 500, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 500, false, false, null, true )); + + //create one RR (RR_1) on remote site + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", null, isOffHeap() )); + + //start the senders on local site + startSenderInVMs("ln", vm4, vm5); + + //create one RR (RR_1) on local site + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + + IgnoredException.addIgnoredException(BatchException70.class.getName()); + IgnoredException.addIgnoredException(ServerOperationException.class.getName()); + + //start puts in RR_1 in another thread + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 1000 )); + //destroy RR_1 in remote site + vm2.invoke(() -> WANTestBase.destroyRegion( getTestMethodName() + "_RR_1")); + + inv1.join(); + + //verify that all is well in local site. All the events should be present in local region + vm4.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR_1", 1000 )); + //assuming some events might have been dispatched before the remote region was destroyed, + //sender's region queue will have events less than 1000 but the queue will not be empty. + //NOTE: this much verification might be sufficient in DUnit. Hydra will take care of + //more in depth validations. + vm4.invoke(() -> WANTestBase.verifyRegionQueueNotEmpty("ln" )); + } + + /** + * Two regions configured in local with the same sender and put is in progress + * on both the regions. Same two regions are configured on remote site as well. + * One of the two regions is destroyed in the middle on remote site. + */ + @Test + public void testReplicatedSerialPropagationWithRemoteRegionDestroy2() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + //these are part of remote site + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + //these are part of local site + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + //senders are created on local site + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 200, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 200, false, false, null, true )); + + //create one RR (RR_1) on remote site + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", null, isOffHeap() )); + + //create another RR (RR_2) on remote site + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", null, isOffHeap() )); + + //start the senders on local site + startSenderInVMs("ln", vm4, vm5); + + //create one RR (RR_1) on local site + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + + //create another RR (RR_2) on local site + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", "ln", isOffHeap() )); + //destroy RR_2 on remote site in the middle + vm2.invoke(() -> WANTestBase.destroyRegion( getTestMethodName() + "_RR_2")); + + //expected exceptions in the logs + IgnoredException.addIgnoredException(BatchException70.class.getName()); + IgnoredException.addIgnoredException(ServerOperationException.class.getName()); + + //start puts in RR_2 in another thread + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_2", 1000 )); + + //start puts in RR_1 in another thread + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 1000 )); + + inv1.join(); + + //though region RR_2 is destroyed, RR_1 should still get all the events put in it + //in local site + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR_1", 1000 )); + } + + @Test + public void testReplicatedSerialPropagationWithRemoteRegionDestroy3() + throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + // these are part of remote site + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + // these are part of local site + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + // senders are created on local site + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 200, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 200, false, false, null, true )); + + // create one RR (RR_1) on remote site + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", null, isOffHeap() )); + + // create another RR (RR_2) on remote site + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", null, isOffHeap() )); + + // start the senders on local site + startSenderInVMs("ln", vm4, vm5); + + // create one RR (RR_1) on local site + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + + // create another RR (RR_2) on local site + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", "ln", isOffHeap() )); + + IgnoredException.addIgnoredException(BatchException70.class.getName()); + IgnoredException.addIgnoredException(ServerOperationException.class.getName()); + + // start puts in RR_1 in another thread + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 1000 )); + // start puts in RR_2 in another thread + AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_2", 1000 )); + // destroy RR_2 on remote site in the middle + vm2.invoke(() -> WANTestBase.destroyRegion( getTestMethodName() + + "_RR_2" )); + + inv1.join(); + inv2.join(); + + // though region RR_2 is destroyed, RR_1 should still get all the events put + // in it + // in local site + try { + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR_1", 1000 )); + } finally { + System.setProperty( + DistributionConfig.GEMFIRE_PREFIX + "GatewaySender.REMOVE_FROM_QUEUE_ON_EXCEPTION", "False"); + vm4.invoke(new CacheSerializableRunnable("UnSetting system property ") { + public void run2() throws CacheException { + System.setProperty( + DistributionConfig.GEMFIRE_PREFIX + "GatewaySender.REMOVE_FROM_QUEUE_ON_EXCEPTION", "False"); + } + }); + + vm5.invoke(new CacheSerializableRunnable("UnSetting system property ") { + public void run2() throws CacheException { + System.setProperty( + DistributionConfig.GEMFIRE_PREFIX + "GatewaySender.REMOVE_FROM_QUEUE_ON_EXCEPTION", "False"); + } + }); + } + } + + /** + * one region and sender configured on local site and the same region and a + * receiver configured on remote site. Puts to the local region are in progress. + * Receiver on remote site is stopped in the middle by closing remote site cache. + */ + @Test + public void testReplicatedSerialPropagationWithRemoteReceiverStopped() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + //these are part of remote site + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReceiver()); + + //these are part of local site + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + //senders are created on local site. Batch size is kept to a high (170) so + //there will be less number of exceptions (occur during dispatchBatch) in the log + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 350, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 350, false, false, null, true )); + + //create one RR (RR_1) on remote site + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", null, isOffHeap() )); + + //start the senders on local site + startSenderInVMs("ln", vm4, vm5); + + //create one RR (RR_1) on local site + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + + //start puts in RR_1 in another thread + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 500 )); + //close cache in remote site. This will automatically kill the remote receivers. + vm2.invoke(() -> WANTestBase.closeCache()); + + inv1.join(); + + //verify that all is well in local site + vm4.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR_1", 500 )); + vm4.invoke(() -> WANTestBase.verifyRegionQueueNotEmpty("ln" )); + } + + @Test + public void testReplicatedSerialPropagationWithRemoteReceiverRestarted() + throws Exception { + Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + // these are part of remote site + vm2.invoke(() -> WANTestBase.createCache( nyPort )); + vm2.invoke(() -> WANTestBase.createReceiver()); + + // these are part of local site + createCacheInVMs(lnPort, vm4, vm5); + + // senders are created on local site. Batch size is kept to a high (170) so + // there will be less number of exceptions (occur during dispatchBatch) in + // the log + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 350, false, false, null, true )); + + // create one RR (RR_1) on remote site + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", null, isOffHeap() )); + + // start the senders on local site + vm4.invoke(() -> WANTestBase.startSender( "ln" )); + + // create one RR (RR_1) on local site + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + + + // start puts in RR_1 in another thread + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 8000 )); + // close cache in remote site. This will automatically kill the remote + // receivers. + Wait.pause(2000); + vm2.invoke(() -> WANTestBase.closeCache()); + + inv1.join(); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 1000 )); + + // verify that all is well in local site + vm4.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR_1", 8000 )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", null, isOffHeap() )); + vm2.invoke(() -> WANTestBase.createReceiver()); + + vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", + 0 )); + + vm2.invoke(() -> WANTestBase.checkMinimumGatewayReceiverStats( 1, 1 )); + } + + @Test + public void testReplicatedSerialPropagationWithRemoteReceiverRestarted_SenderReceiverPersistent() + throws Exception { + Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + // these are part of remote site + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReceiver()); + + // these are part of local site + createCacheInVMs(lnPort, vm4, vm5); + + // senders are created on local site. Batch size is kept to a high (170) so + // there will be less number of exceptions (occur during dispatchBatch) in + // the log + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 350, false, true, null, true )); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 350, false, true, null, true)); + + // create one RR (RR_1) on remote site + vm2.invoke(() -> WANTestBase.createPersistentReplicatedRegion(getTestMethodName() + "_RR_1", null, isOffHeap())); + vm2.invoke(() -> WANTestBase.addListenerToSleepAfterCreateEvent(2000, getTestMethodName() + "_RR_1")); + // start the senders on local site + startSenderInVMs("ln", vm4, vm5); + + // create one RR (RR_1) on local site + vm4.invoke(() -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR_1", "ln", isOffHeap())); + + // start puts in RR_1 in another thread + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 8000 )); + // close cache in remote site. This will automatically kill the remote + // receivers. + vm2.invoke(() -> WANTestBase.closeCache()); + + inv1.join(); + + // verify that all is well in local site + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR_1", 8000)); + + vm4.invoke(() -> WANTestBase.verifyRegionQueueNotEmpty( "ln" )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createPersistentReplicatedRegion(getTestMethodName() + "_RR_1", null, isOffHeap())); + vm2.invoke(() -> WANTestBase.createReceiver()); + + vm4.invoke(() -> WANTestBase.validateQueueContents("ln", 0)); + + vm2.invoke(() -> WANTestBase.checkMinimumGatewayReceiverStats(1, 1)); + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR_1", 8000)); + } + + @Test + public void testReplicatedSerialPropagationWithRemoteSiteBouncedBack_ReceiverPersistent() + throws Exception { + Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + // these are part of remote site + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReceiver()); + + // these are part of local site + createCacheInVMs(lnPort, vm4, vm5); + + // senders are created on local site. Batch size is kept to a high (170) so + // there will be less number of exceptions (occur during dispatchBatch) in + // the log + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 350, false, false, null, true )); + + // create one RR (RR_1) on remote site + vm2.invoke(() -> WANTestBase.createPersistentReplicatedRegion( + getTestMethodName() + "_RR_1", null, isOffHeap() )); + + // start the senders on local site + vm4.invoke(() -> WANTestBase.startSender( "ln" )); + + // create one RR (RR_1) on local site + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + + // start puts in RR_1 in another thread + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 8000 )); + // close cache in remote site. This will automatically kill the remote + // receivers. + Wait.pause(2000); + vm1.invoke(() -> WANTestBase.shutdownLocator()); + vm2.invoke(() -> WANTestBase.closeCache()); + + inv1.join(); + + // Do some extra puts after cache close so that some events are in the queue. + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 1000 )); + + // verify that all is well in local site + vm4.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR_1", 8000 )); + + vm4.invoke(() -> WANTestBase.verifyRegionQueueNotEmpty( "ln" )); + + vm1.invoke(() -> WANTestBase.bringBackLocatorOnOldPort( + 2, lnPort, nyPort )); + + createCacheInVMs(nyPort, vm2); + + vm2.invoke(() -> WANTestBase.createPersistentReplicatedRegion( + getTestMethodName() + "_RR_1", null, isOffHeap() )); + + vm2.invoke(() -> WANTestBase.createReceiver()); + + vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", + 0 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR_1", 8000 )); + + vm2.invoke(() -> WANTestBase.checkMinimumGatewayReceiverStats( 1, 1 )); + } + + @Test + public void testReplicatedSerialPropagationWithRemoteSiteBouncedBackWithMultipleRemoteLocators() + throws Exception { + Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort1 = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + Integer nyPort2 = (Integer) vm3.invoke(() -> WANTestBase.createSecondRemoteLocator( 2, nyPort1, lnPort )); + + // these are part of remote site + createCacheInVMs(nyPort1, vm2); + vm2.invoke(() -> WANTestBase.createReceiver()); + + // these are part of local site + createCacheInVMs(lnPort, vm4, vm5); + + // senders are created on local site. Batch size is kept to a high (170) so + // there will be less number of exceptions (occur during dispatchBatch) in + // the log + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 350, false, false, null, true )); + + // create one RR (RR_1) on remote site + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", null, isOffHeap() )); + + // start the senders on local site + vm4.invoke(() -> WANTestBase.startSender( "ln" )); + + // create one RR (RR_1) on local site + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + + // start puts in RR_1 in another thread + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 8000 )); + // close cache in remote site. This will automatically kill the remote + // receivers. + Wait.pause(2000); + vm1.invoke(() -> WANTestBase.shutdownLocator()); + vm2.invoke(() -> WANTestBase.closeCache()); + + inv1.join(); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 1000 )); + // verify that all is well in local site + vm4.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR_1", 8000 )); + + vm4.invoke(() -> WANTestBase.verifyRegionQueueNotEmpty( "ln" )); + + createCacheInVMs(nyPort2, vm6); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", null, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReceiver()); + + vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", + 0 )); + + vm6.invoke(() -> WANTestBase.checkMinimumGatewayReceiverStats( 1, 1 )); + } + + @Test + public void testReplicatedSerialPropagationWithRemoteReceiverRestartedOnOtherNode() throws Exception { + Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + // these are part of remote site + createCacheInVMs(nyPort, vm2, vm3); + + // these are part of local site + createCacheInVMs(lnPort, vm4); + + // senders are created on local site. Batch size is kept to a high (170) so + // there will be less number of exceptions (occur during dispatchBatch) in + // the log + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 350, false, false, null, true )); + + // create one RR (RR_1) on remote site + vm2.invoke(() -> WANTestBase.createPersistentReplicatedRegion(getTestMethodName() + "_RR_1", null, isOffHeap())); + vm2.invoke(() -> WANTestBase.createReceiver()); + + vm3.invoke(() -> WANTestBase.createPersistentReplicatedRegion(getTestMethodName() + "_RR_1", null, isOffHeap())); + + vm2.invoke(() -> addListenerToSleepAfterCreateEvent(2000, getTestMethodName() + "_RR_1")); + vm3.invoke(() -> addListenerToSleepAfterCreateEvent(2000, getTestMethodName() + "_RR_1")); + + // create one RR (RR_1) on local site + vm4.invoke(() -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR_1", "ln", isOffHeap())); + // start the senders on local site + vm4.invoke(() -> WANTestBase.startSender("ln")); + + // start puts in RR_1 in another thread + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 8000 )); + // close cache in remote site. This will automatically kill the remote + // receivers. + vm2.invoke(() -> WANTestBase.closeCache()); + vm3.invoke(() -> WANTestBase.closeCache()); + + inv1.join(); + + // verify that all is well in local site + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR_1", 8000)); + + vm4.invoke(() -> WANTestBase.verifyRegionQueueNotEmpty( "ln" )); + + createCacheInVMs(nyPort, vm3); + vm3.invoke(() -> WANTestBase.createPersistentReplicatedRegion(getTestMethodName() + "_RR_1", null, isOffHeap())); + vm3.invoke(() -> WANTestBase.createReceiver()); + + vm4.invoke(() -> WANTestBase.validateQueueContents("ln", 0)); + + vm3.invoke(() -> WANTestBase.checkMinimumGatewayReceiverStats( 1, 1 )); + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR_1", 8000)); + } + + @Test + public void testReplicatedSerialPropagationToTwoWanSites() throws Exception { + Integer lnPort = createFirstLocatorWithDSId(1); + Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReceiver()); + createCacheInVMs(tkPort, vm3); + vm3.invoke(() -> WANTestBase.createReceiver()); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "lnSerial1", + 2, false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "lnSerial1", + 2, false, 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createSender( "lnSerial2", + 3, false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "lnSerial2", + 3, false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("lnSerial1", vm4, vm5); + startSenderInVMs("lnSerial2", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + } + + @Test + public void testReplicatedSerialPropagationHA() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(createReplicatedRegionRunnable()); + vm5.invoke(createReplicatedRegionRunnable()); + vm6.invoke(createReplicatedRegionRunnable()); + vm7.invoke(createReplicatedRegionRunnable()); + + AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 10000 )); + Wait.pause(2000); + AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender()); + + inv1.join(); + inv2.join(); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 10000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 10000 )); + } + + /** + * Local site:: vm4: Primary vm5: Secondary + * + * Remote site:: vm2, vm3, vm6, vm7: All hosting receivers + * + * vm4 is killed, so vm5 takes primary charge + * + * SUR: disabling due to connection information not available in open source + * enable this once in closed source + */ + @Ignore("TODO: test is disabled") + @Test + public void testReplicatedSerialPropagationHA_ReceiverAffinity() + throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3, vm6, vm7); + createReceiverInVMs(vm2, vm3, vm6, vm7); + + LogWriterUtils.getLogWriter().info("Started receivers on remote site"); + + WANTestBase.createCacheInVMs(lnPort, vm4, vm5); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + LogWriterUtils.getLogWriter().info("Started senders on local site"); + + vm4.invoke(createReplicatedRegionRunnable()); + vm5.invoke(createReplicatedRegionRunnable()); + + AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 10000 )); + LogWriterUtils.getLogWriter().info("Started async puts on local site"); + Wait.pause(1000); + + Map oldConnectionInfo = (Map)vm4.invoke(() -> WANTestBase.getSenderToReceiverConnectionInfo( "ln" )); + assertNotNull(oldConnectionInfo); + String oldServerHost = (String)oldConnectionInfo.get("serverHost"); + int oldServerPort = (Integer)oldConnectionInfo.get("serverPort"); + LogWriterUtils.getLogWriter().info("Got sender to receiver connection information"); + + AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender()); + inv2.join(); + LogWriterUtils.getLogWriter().info("Killed primary sender on local site"); + Wait.pause(5000);// give some time for vm5 to take primary charge + + Map newConnectionInfo = (Map)vm5.invoke(() -> WANTestBase.getSenderToReceiverConnectionInfo( "ln" )); + assertNotNull(newConnectionInfo); + String newServerHost = (String)newConnectionInfo.get("serverHost"); + int newServerPort = (Integer)newConnectionInfo.get("serverPort"); + LogWriterUtils.getLogWriter().info("Got new sender to receiver connection information"); + assertEquals(oldServerHost, newServerHost); + assertEquals(oldServerPort, newServerPort); + + LogWriterUtils.getLogWriter() + .info( + "Matched the new connection info with old connection info. Receiver affinity verified."); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 10000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 10000 )); + } + + /** + * Local site:: vm4: Primary vm5: Secondary + * + * Remote site:: vm2, vm3, vm6, vm7: All hosting receivers + * + * vm4 is killed, so vm5 takes primary charge. vm4 brought up. vm5 is killed, + * so vm4 takes primary charge again. + * + * SUR: commenting due to connection information not available in open source + * enable this once in closed source + */ + @Ignore("TODO: test is disabled") + @Test + public void testReplicatedSerialPropagationHA_ReceiverAffinityScenario2() + throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3, vm6, vm7); + createReceiverInVMs(vm2, vm3, vm6, vm7); + + LogWriterUtils.getLogWriter().info("Started receivers on remote site"); + + createCacheInVMs(lnPort, vm4, vm5); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + LogWriterUtils.getLogWriter().info("Started senders on local site"); + + vm4.invoke(createReplicatedRegionRunnable()); + vm5.invoke(createReplicatedRegionRunnable()); + + AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 10000 )); + LogWriterUtils.getLogWriter().info("Started async puts on local site"); + Wait.pause(1000); + + Map oldConnectionInfo = (Map)vm4.invoke(() -> WANTestBase.getSenderToReceiverConnectionInfo( "ln" )); + assertNotNull(oldConnectionInfo); + String oldServerHost = (String)oldConnectionInfo.get("serverHost"); + int oldServerPort = (Integer)oldConnectionInfo.get("serverPort"); + LogWriterUtils.getLogWriter().info("Got sender to receiver connection information"); + + // ---------------------------- KILL vm4 + // -------------------------------------- + AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender()); + inv2.join(); + LogWriterUtils.getLogWriter().info("Killed vm4 (primary sender) on local site"); + // ----------------------------------------------------------------------------- + + vm5.invoke(() -> WANTestBase.waitForSenderToBecomePrimary( "ln" )); + LogWriterUtils.getLogWriter().info("vm5 sender has now acquired primary status"); + Wait.pause(5000);// give time to process unprocessedEventsMap + + // ---------------------------REBUILD vm4 + // -------------------------------------- + LogWriterUtils.getLogWriter().info("Rebuilding vm4...."); + createCacheInVMs(lnPort, vm4); + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + vm4.invoke(() -> WANTestBase.startSender( "ln" )); + vm4.invoke(createReplicatedRegionRunnable()); + LogWriterUtils.getLogWriter().info("Rebuilt vm4"); + // ----------------------------------------------------------------------------- + + // --------------------------- KILL vm5 + // ---------------------------------------- + inv1.join();// once the puts are done, kill vm5 + LogWriterUtils.getLogWriter().info("puts in vm5 are done"); + + inv2 = vm5.invokeAsync(() -> WANTestBase.killSender()); + inv2.join(); + vm4.invoke(() -> WANTestBase.waitForSenderToBecomePrimary( "ln" )); + // ----------------------------------------------------------------------------- + + Map newConnectionInfo = (Map)vm4.invoke(() -> WANTestBase.getSenderToReceiverConnectionInfo( "ln" )); + assertNotNull(newConnectionInfo); + String newServerHost = (String)newConnectionInfo.get("serverHost"); + int newServerPort = (Integer)newConnectionInfo.get("serverPort"); + LogWriterUtils.getLogWriter().info("Got new sender to receiver connection information"); + assertEquals(oldServerHost, newServerHost); + assertEquals(oldServerPort, newServerPort); + LogWriterUtils.getLogWriter() + .info( + "Matched the new connection info with old connection info. Receiver affinity verified."); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 10000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 10000 )); + } + + @Test + public void testNormalRegionSerialPropagation() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReceiver()); + + createCacheInVMs(lnPort, vm4, vm5); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.startSender( "ln" )); + vm5.invoke(() -> WANTestBase.startSender( "ln" )); + + vm4.invoke(() -> WANTestBase.createNormalRegion( + getTestMethodName() + "_RR", "ln" )); + vm5.invoke(() -> WANTestBase.createNormalRegion( + getTestMethodName() + "_RR", "ln" )); + + vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + vm4.invoke(() -> WANTestBase.checkQueueStats( "ln", 0, + 0, 0, 0)); + + vm5.invoke(() -> WANTestBase.checkQueueStats( "ln", 0, + 1000, 0, 0 )); + + vm5.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000)); + + vm4.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 0)); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 0 )); + + vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, 0, 0)); + } + + /** + * Added for defect #48582 NPE when WAN sender configured but not started. + * All to all topology with 2 WAN sites: + * Site 1 (LN site): vm4, vm5, vm6, vm7 + * Site 2 (NY site): vm2, vm3 + */ + @Test + public void testReplicatedSerialPropagationWithRemoteSenderConfiguredButNotStarted() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + createReceiverInVMs(vm4, vm5); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(createReplicatedRegionRunnable()); + vm5.invoke(createReplicatedRegionRunnable()); + vm6.invoke(createReplicatedRegionRunnable()); + vm7.invoke(createReplicatedRegionRunnable()); + + vm2.invoke(() -> WANTestBase.createSender( "ny", 1, + false, 100, 10, false, false, null, true )); + vm3.invoke(() -> WANTestBase.createSender( "ny", 1, + false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ny", isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ny", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + startSenderInVMs("ny", vm2, vm3); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f37769c/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationLoopBackDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationLoopBackDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationLoopBackDUnitTest.java index 4b53626..6859249 100644 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationLoopBackDUnitTest.java +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationLoopBackDUnitTest.java @@ -180,24 +180,24 @@ public class SerialWANPropagationLoopBackDUnitTest extends WANTestBase { false )); int totalSize = 3; - int increament = 1; + int increment = 1; final Map keyValues = new HashMap(); - for(int i=0; i< increament; i++) { + for(int i=0; i< increment; i++) { keyValues.put(i, i); } vm3.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR", keyValues )); keyValues.clear(); - for(int i = increament; i< 2*increament; i++) { + for(int i = increment; i< 2*increment; i++) { keyValues.put(i, i); } vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR", keyValues )); keyValues.clear(); - for(int i=2*increament; i< totalSize; i++) { + for(int i=2*increment; i< totalSize; i++) { keyValues.put(i, i); } vm5.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR", http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f37769c/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java new file mode 100644 index 0000000..47dee96 --- /dev/null +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.serial; + +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.CancelException; +import com.gemstone.gemfire.cache.CacheClosedException; +import com.gemstone.gemfire.internal.cache.ForceReattemptException; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.AsyncInvocation; +import com.gemstone.gemfire.test.dunit.IgnoredException; +import com.gemstone.gemfire.test.junit.categories.FlakyTest; + +@Category(DistributedTest.class) +public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase { + + private static final long serialVersionUID = 1L; + + public SerialWANPropagation_PartitionedRegionDUnitTest() { + super(); + } + + @Test + public void testPartitionedSerialPropagation() throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.startSender( "ln" )); + //vm5.invoke(() -> WANTestBase.startSender( "ln" )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } + + @Test + public void testBothReplicatedAndPartitionedSerialPropagation() + throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } + + @Test + public void testSerialReplicatedAndPartitionedPropagation() throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "lnSerial", + 2, false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "lnSerial", + 2, false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnSerial", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnSerial", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnSerial", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnSerial", 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.startSender( "lnSerial" )); + vm5.invoke(() -> WANTestBase.startSender( "lnSerial" )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } + + @Test + public void testSerialReplicatedAndSerialPartitionedPropagation() + throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "lnSerial1", + 2, false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "lnSerial1", + 2, false, 100, 10, false, false, null, true )); + + vm5.invoke(() -> WANTestBase.createSender( "lnSerial2", + 2, false, 100, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "lnSerial2", + 2, false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial1", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial1", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial1", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial1", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnSerial2", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnSerial2", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnSerial2", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnSerial2", 1, 100, isOffHeap() )); + + startSenderInVMs("lnSerial1", vm4, vm5); + + startSenderInVMs("lnSerial2", vm5, vm6); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } + + @Test + public void testPartitionedSerialPropagationToTwoWanSites() throws Exception { + + Integer lnPort = createFirstLocatorWithDSId(1); + Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort )); + Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(3,lnPort )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReceiver()); + createCacheInVMs(tkPort, vm3); + vm3.invoke(() -> WANTestBase.createReceiver()); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "lnSerial1", + 2, false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "lnSerial1", + 2, false, 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createSender( "lnSerial2", + 3, false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "lnSerial2", + 3, false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + startSenderInVMs("lnSerial1", vm4, vm5); + startSenderInVMs("lnSerial2", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnSerial1,lnSerial2", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnSerial1,lnSerial2", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnSerial1,lnSerial2", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnSerial1,lnSerial2", 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } + + @Test + public void testPartitionedSerialPropagationHA() throws Exception { + IgnoredException.addIgnoredException("Broken pipe"); + IgnoredException.addIgnoredException("Connection reset"); + IgnoredException.addIgnoredException("Unexpected IOException"); + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + //do initial 100 puts to create all the buckets + vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 )); + + IgnoredException.addIgnoredException(CancelException.class.getName()); + IgnoredException.addIgnoredException(CacheClosedException.class.getName()); + IgnoredException.addIgnoredException(ForceReattemptException.class.getName()); + //start async puts + AsyncInvocation inv = vm5.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + //close the cache on vm4 in between the puts + vm4.invoke(() -> WANTestBase.killSender()); + + inv.join(); + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } + + @Test + public void testPartitionedSerialPropagationWithParallelThreads() throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + createReceiverInVMs(vm2, vm3); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + startSenderInVMs("ln", vm4, vm5); + + + + vm4.invoke(() -> WANTestBase.doMultiThreadedPuts( // TODO: eats exceptions + getTestMethodName() + "_PR", 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } +}