Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 18ED1200BA5 for ; Wed, 14 Sep 2016 00:44:00 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 17666160AD3; Tue, 13 Sep 2016 22:44:00 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 99092160AD2 for ; Wed, 14 Sep 2016 00:43:58 +0200 (CEST) Received: (qmail 28597 invoked by uid 500); 13 Sep 2016 22:43:57 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 28587 invoked by uid 99); 13 Sep 2016 22:43:57 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Sep 2016 22:43:57 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 6A1721A92B0 for ; Tue, 13 Sep 2016 22:43:57 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id Cd_WcRXFdYUz for ; Tue, 13 Sep 2016 22:43:52 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id F1F855FB39 for ; Tue, 13 Sep 2016 22:43:50 +0000 (UTC) Received: (qmail 28281 invoked by uid 99); 13 Sep 2016 22:43:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Sep 2016 22:43:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EBF49E07F4; Tue, 13 Sep 2016 22:43:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hiteshkhamesra@apache.org To: commits@geode.incubator.apache.org Date: Tue, 13 Sep 2016 22:43:55 -0000 Message-Id: <3fe28e9a90444419b7f5c40089af3f06@git.apache.org> In-Reply-To: <69ace8383aae47d1b0c2dc7793e01b2a@git.apache.org> References: <69ace8383aae47d1b0c2dc7793e01b2a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/61] [abbrv] incubator-geode git commit: GEODE-37 change package name from com.gemstone.gemfire (for ./geode-wan/src/test/java/com/gemstone/gemfire)to org.apache.geode for(to ./geode-wan/src/test/java/org/apache/geode) archived-at: Tue, 13 Sep 2016 22:44:00 -0000 http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java new file mode 100644 index 0000000..411396b --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.serial; + +import static com.gemstone.gemfire.distributed.ConfigurationProperties.*; +import static org.junit.Assert.*; + +import java.io.File; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.cache.AttributesFactory; +import com.gemstone.gemfire.cache.CacheFactory; +import com.gemstone.gemfire.cache.DataPolicy; +import com.gemstone.gemfire.cache.DiskStore; +import com.gemstone.gemfire.cache.DiskStoreFactory; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.wan.GatewayEventFilter; +import com.gemstone.gemfire.cache.wan.GatewaySender; +import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; +import com.gemstone.gemfire.cache.wan.GatewaySenderFactory; +import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; +import com.gemstone.gemfire.cache30.MyGatewayEventFilter1; +import com.gemstone.gemfire.cache30.MyGatewayTransportFilter1; +import com.gemstone.gemfire.cache30.MyGatewayTransportFilter2; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.internal.cache.RegionQueue; +import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.IgnoredException; +import com.gemstone.gemfire.test.dunit.VM; +import com.gemstone.gemfire.test.dunit.Wait; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +@Category(DistributedTest.class) +public class SerialGatewaySenderQueueDUnitTest extends WANTestBase { + + @Test + public void testPrimarySecondaryQueueDrainInOrder_RR() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + vm2.invoke(() -> WANTestBase.createCache(nyPort )); + vm3.invoke(() -> WANTestBase.createCache(nyPort )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + vm2.invoke(() -> WANTestBase.createReceiver()); + vm3.invoke(() -> WANTestBase.createReceiver()); + + vm4.invoke(() -> WANTestBase.createCache( lnPort )); + vm5.invoke(() -> WANTestBase.createCache( lnPort )); + vm6.invoke(() -> WANTestBase.createCache( lnPort )); + vm7.invoke(() -> WANTestBase.createCache( lnPort )); + + vm4.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers( "ln", 2, + false, 100, 10, false, false, null, true, 1, OrderPolicy.KEY )); + vm5.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers( "ln", 2, + false, 100, 10, false, false, null, true, 1, OrderPolicy.KEY )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.addQueueListener( "ln", false)); + vm5.invoke(() -> WANTestBase.addQueueListener( "ln", false)); + + vm2.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_RR")); + vm3.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_RR")); + + vm4.invoke(() -> WANTestBase.pauseSender( "ln")); + + vm6.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + Wait.pause(5000); + HashMap primarySenderUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue()); + HashMap secondarySenderUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue()); + assertEquals(primarySenderUpdates, secondarySenderUpdates); + + vm4.invoke(() -> WANTestBase.resumeSender( "ln")); + Wait.pause(2000); + vm4.invoke(() -> WANTestBase.pauseSender( "ln")); + Wait.pause(2000); + // We should wait till primarySenderUpdates and secondarySenderUpdates become same + // If in 300000ms they don't then throw error. + primarySenderUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue()); + secondarySenderUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue()); + + checkPrimarySenderUpdatesOnVM5(primarySenderUpdates); +// assertIndexDetailsEquals(primarySenderUpdates, secondarySenderUpdates); + + vm4.invoke(() -> WANTestBase.resumeSender( "ln")); + Wait.pause(5000); + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + primarySenderUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue()); + HashMap receiverUpdates = (HashMap)vm2.invoke(() -> WANTestBase.checkQueue()); + + List destroyList = (List)primarySenderUpdates.get("Destroy"); + List createList = (List)receiverUpdates.get("Create"); + for(int i = 0; i< 1000; i++){ + assertEquals(destroyList.get(i), createList.get(i)); + } + assertEquals(primarySenderUpdates.get("Destroy"), receiverUpdates.get("Create")); + + Wait.pause(5000); + // We expect that after this much time secondary would have got batch removal message + // removing all the keys. + secondarySenderUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue()); + assertEquals(secondarySenderUpdates.get("Destroy"), receiverUpdates.get("Create")); + } + + protected void checkPrimarySenderUpdatesOnVM5(HashMap primarySenderUpdates) { + vm5.invoke(() -> WANTestBase.checkQueueOnSecondary( primarySenderUpdates )); + } + + @Test + public void testPrimarySecondaryQueueDrainInOrder_PR() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + vm2.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_PR")); + vm3.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_PR")); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers( "ln", 2, + false, 100, 10, false, false, null, true, 1, OrderPolicy.KEY )); + vm5.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers( "ln", 2, + false, 100, 10, false, false, null, true,1, OrderPolicy.KEY )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.addQueueListener( "ln", false)); + vm5.invoke(() -> WANTestBase.addQueueListener( "ln", false)); + + vm4.invoke(() -> WANTestBase.pauseSender( "ln")); + + vm6.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 1000 )); + Wait.pause(5000); + HashMap primarySenderUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue()); + HashMap secondarySenderUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue()); + checkPrimarySenderUpdatesOnVM5(primarySenderUpdates); + + vm4.invoke(() -> WANTestBase.resumeSender( "ln")); + Wait.pause(4000); + vm4.invoke(() -> WANTestBase.pauseSender( "ln")); + Wait.pause(15000); + primarySenderUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue()); + secondarySenderUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue()); + assertEquals(primarySenderUpdates, secondarySenderUpdates); + + vm4.invoke(() -> WANTestBase.resumeSender( "ln")); + Wait.pause(5000); + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } + + /** + * Test to validate that serial gateway sender queue diskSynchronous attribute + * when persistence of sender is enabled. + */ + @Test + public void test_ValidateSerialGatewaySenderQueueAttributes_1() { + Integer localLocPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + + Integer remoteLocPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, localLocPort )); + + WANTestBase test = new WANTestBase(getTestMethodName()); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + + localLocPort + "]"); + InternalDistributedSystem ds = test.getSystem(props); + cache = CacheFactory.create(ds); + + File directory = new File("TKSender" + "_disk_" + + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); + directory.mkdir(); + File[] dirs1 = new File[] { directory }; + DiskStoreFactory dsf = cache.createDiskStoreFactory(); + dsf.setDiskDirs(dirs1); + DiskStore diskStore = dsf.create("FORNY"); + + GatewaySenderFactory fact = cache.createGatewaySenderFactory(); + fact.setBatchConflationEnabled(true); + fact.setBatchSize(200); + fact.setBatchTimeInterval(300); + fact.setPersistenceEnabled(true);// enable the persistence + fact.setDiskSynchronous(true); + fact.setDiskStoreName("FORNY"); + fact.setMaximumQueueMemory(200); + fact.setAlertThreshold(1200); + GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1(); + fact.addGatewayEventFilter(myEventFilter1); + GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); + fact.addGatewayTransportFilter(myStreamFilter1); + GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2(); + fact.addGatewayTransportFilter(myStreamFilter2); + final IgnoredException exTKSender = IgnoredException.addIgnoredException("Could not connect"); + try { + GatewaySender sender1 = fact.create("TKSender", 2); + + AttributesFactory factory = new AttributesFactory(); + factory.addGatewaySenderId(sender1.getId()); + factory.setDataPolicy(DataPolicy.PARTITION); + Region region = cache.createRegionFactory(factory.create()).create( + "test_ValidateGatewaySenderAttributes"); + Set senders = cache.getGatewaySenders(); + assertEquals(senders.size(), 1); + GatewaySender gatewaySender = senders.iterator().next(); + Set regionQueues = ((AbstractGatewaySender) gatewaySender) + .getQueues(); + assertEquals(regionQueues.size(), GatewaySender.DEFAULT_DISPATCHER_THREADS); + RegionQueue regionQueue = regionQueues.iterator().next(); + assertEquals(true, regionQueue.getRegion().getAttributes() + .isDiskSynchronous()); + } finally { + exTKSender.remove(); + } + } + + /** + * Test to validate that serial gateway sender queue diskSynchronous attribute + * when persistence of sender is not enabled. + */ + @Test + public void test_ValidateSerialGatewaySenderQueueAttributes_2() { + Integer localLocPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + + Integer remoteLocPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, localLocPort )); + + WANTestBase test = new WANTestBase(getTestMethodName()); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + localLocPort + "]"); + InternalDistributedSystem ds = test.getSystem(props); + cache = CacheFactory.create(ds); + + GatewaySenderFactory fact = cache.createGatewaySenderFactory(); + fact.setBatchConflationEnabled(true); + fact.setBatchSize(200); + fact.setBatchTimeInterval(300); + fact.setPersistenceEnabled(false);//set persistence to false + fact.setDiskSynchronous(true); + fact.setMaximumQueueMemory(200); + fact.setAlertThreshold(1200); + GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1(); + fact.addGatewayEventFilter(myEventFilter1); + GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); + fact.addGatewayTransportFilter(myStreamFilter1); + GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2(); + fact.addGatewayTransportFilter(myStreamFilter2); + final IgnoredException exp = IgnoredException.addIgnoredException("Could not connect"); + try { + GatewaySender sender1 = fact.create("TKSender", 2); + + AttributesFactory factory = new AttributesFactory(); + factory.addGatewaySenderId(sender1.getId()); + factory.setDataPolicy(DataPolicy.PARTITION); + Region region = cache.createRegionFactory(factory.create()).create( + "test_ValidateGatewaySenderAttributes"); + Set senders = cache.getGatewaySenders(); + assertEquals(senders.size(), 1); + GatewaySender gatewaySender = senders.iterator().next(); + Set regionQueues = ((AbstractGatewaySender) gatewaySender) + .getQueues(); + assertEquals(regionQueues.size(), GatewaySender.DEFAULT_DISPATCHER_THREADS); + RegionQueue regionQueue = regionQueues.iterator().next(); + + assertEquals(false, regionQueue.getRegion().getAttributes() + .isDiskSynchronous()); + } finally { + exp.remove(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPersistenceEnabledGatewaySenderDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPersistenceEnabledGatewaySenderDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPersistenceEnabledGatewaySenderDUnitTest.java new file mode 100644 index 0000000..7664ab4 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPersistenceEnabledGatewaySenderDUnitTest.java @@ -0,0 +1,547 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.serial; + +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + + +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.AsyncInvocation; +import com.gemstone.gemfire.test.dunit.LogWriterUtils; +import com.gemstone.gemfire.test.dunit.Wait; + +/** + * + */ +@Category(DistributedTest.class) +public class SerialWANPersistenceEnabledGatewaySenderDUnitTest extends + WANTestBase { + + private static final long serialVersionUID = 1L; + + public SerialWANPersistenceEnabledGatewaySenderDUnitTest() { + super(); + } + + /** + * Just enable the persistence for GatewaySender and see if it remote site + * receives all the events. + */ + @Test + public void testReplicatedRegionWithGatewaySenderPersistenceEnabled() { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, true, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, true, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + + } + + /** + * Enable persistence for the Region and see if the remote site gets all the + * events. + */ + @Test + public void testPersistentReplicatedRegionWithGatewaySender() { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + + } + + /** + * Enable persistence for region as well as GatewaySender and see if remote + * site receives all the events. + * + */ + @Test + public void testPersistentReplicatedRegionWithGatewaySenderPersistenceEnabled() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, true, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, true, null, true )); + + vm2.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + + } + + /** + * Enable persistence for GatewaySender, kill the sender and restart it. Check + * if the remote site receives all the event. + */ + @Test + public void testReplicatedRegionWithGatewaySenderPersistenceEnabled_Restart() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + String firstDStore = (String)vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false, + 100, 10, false, true, null, null, true )); + String secondDStore = (String)vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false, + 100, 10, false, true, null, null, true )); + + LogWriterUtils.getLogWriter().info("The first ds is " + firstDStore); + LogWriterUtils.getLogWriter().info("The first ds is " + secondDStore); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPersistentReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPersistentReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + LogWriterUtils.getLogWriter().info("Completed puts in the region"); + + // verify if the queue has all the events + // vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", 1000 + // )); + // vm5.invoke(() -> WANTestBase.checkQueueSize( "ln", 1000 + // )); + // + // vm2.invoke(() -> WANTestBase.validateRegionSize( + // testName + "_RR", 0 )); + // vm3.invoke(() -> WANTestBase.validateRegionSize( + // testName + "_RR", 0 )); + + // kill the vm + vm4.invoke(() -> WANTestBase.killSender()); + vm5.invoke(() -> WANTestBase.killSender()); + vm6.invoke(() -> WANTestBase.killSender()); + vm7.invoke(() -> WANTestBase.killSender()); + + LogWriterUtils.getLogWriter().info("Killed all the sender. "); + // restart the vm + vm4.invoke(() -> WANTestBase.createCache( lnPort )); + vm5.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( + "ln", 2, false, 100, 10, false, true, null, + firstDStore, true )); + LogWriterUtils.getLogWriter().info("Created the sender.... in vm4 "); + vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( + "ln", 2, false, 100, 10, false, true, null, + secondDStore, true )); + LogWriterUtils.getLogWriter().info("Created the sender.... in vm5 "); + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.startSender( "ln" )); + LogWriterUtils.getLogWriter().info("Started the sender in vm 4"); + + vm5.invoke(() -> WANTestBase.startSender( "ln" )); + LogWriterUtils.getLogWriter().info("Started the sender in vm 5"); + try { + inv1.join(); + } catch (InterruptedException e) { + fail("Got interrupted exception while waiting for startSender to finish."); + } + + Wait.pause(5000); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + + } + + /** + * Enable persistence for Region and persistence for GatewaySender. Kill the + * vm with regions and bring that up again. Check if the remote site receives + * all the event. again? + * + */ + @Test + public void testPersistentReplicatedRegionWithGatewaySenderPersistenceEnabled_Restart() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + String firstDStore = (String)vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false, + 100, 10, false, true, null, null, true )); + String secondDStore = (String)vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false, + 100, 10, false, true, null, null, true )); + + LogWriterUtils.getLogWriter().info("The first ds is " + firstDStore); + LogWriterUtils.getLogWriter().info("The first ds is " + secondDStore); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPersistentReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPersistentReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + LogWriterUtils.getLogWriter().info("Completed puts in the region"); + + // kill the vm + vm4.invoke(() -> WANTestBase.killSender()); + vm5.invoke(() -> WANTestBase.killSender()); + + LogWriterUtils.getLogWriter().info("Killed the sender. "); + // restart the vm + vm4.invoke(() -> WANTestBase.createCache( lnPort )); + vm5.invoke(() -> WANTestBase.createCache( lnPort )); + + vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false, + 100, 10, false, true, null, firstDStore, true )); + LogWriterUtils.getLogWriter().info("Created the sender.... in vm4 "); + vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false, + 100, 10, false, true, null, secondDStore, true )); + LogWriterUtils.getLogWriter().info("Created the sender.... in vm5 "); + + vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.startSender( "ln" )); + LogWriterUtils.getLogWriter().info("Started the sender in vm 4"); + + vm5.invoke(() -> WANTestBase.startSender( "ln" )); + LogWriterUtils.getLogWriter().info("Started the sender in vm 5"); + try { + inv1.join(); + } catch (InterruptedException e) { + fail("Got interrupted exception while waiting for startSender to finish."); + } + + Wait.pause(5000); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + + } + + /** + * Enable persistence for Region. No persistence for GatewaySender. Kill the + * vm with regions and bring that up again. Check if the remote site receives + * all the event. again? + * + */ + @Test + public void testPersistentReplicatedRegionWithGatewaySender_Restart() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, false, + 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, false, + 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPersistentReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPersistentReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + LogWriterUtils.getLogWriter().info("Completed puts in the region"); + + // verify if the queue has all the events + // vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", 1000 + // )); + // vm5.invoke(() -> WANTestBase.checkQueueSize( "ln", 1000 + // )); + // + // vm2.invoke(() -> WANTestBase.validateRegionSize( + // testName + "_RR", 0 )); + // vm3.invoke(() -> WANTestBase.validateRegionSize( + // testName + "_RR", 0 )); + + // kill the vm + vm4.invoke(() -> WANTestBase.killSender()); + vm5.invoke(() -> WANTestBase.killSender()); + + LogWriterUtils.getLogWriter().info("Killed the sender. "); + // restart the vm + vm4.invoke(() -> WANTestBase.createCache( lnPort )); + vm5.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(() -> WANTestBase.createSender( + "ln", 2, false, 100, 10, false, false, null, true)); + LogWriterUtils.getLogWriter().info("Created the sender.... in vm4 "); + vm5.invoke(() -> WANTestBase.createSender( + "ln", 2, false, 100, 10, false, false, null, true)); + LogWriterUtils.getLogWriter().info("Created the sender.... in vm5 "); + + vm4.invoke(() -> WANTestBase.startSender( "ln" )); + LogWriterUtils.getLogWriter().info("Started the sender in vm 4"); + + vm5.invoke(() -> WANTestBase.startSender( "ln" )); + LogWriterUtils.getLogWriter().info("Started the sender in vm 5"); + + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.createPersistentReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + try { + inv1.join(); + } catch (InterruptedException e) { + fail("Got interrupted exception while waiting for startSender to finish."); + } + + Wait.pause(5000); + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + + } + + + /** + * Enable persistence for Region and persistence for GatewaySender. Kill the + * vm with regions and bring that up again. Check if the remote site receives + * all the event. again? + * In this case put is continuously happening while the vm is down. + */ + @Test + public void testPersistentReplicatedRegionWithGatewaySenderPersistenceEnabled_Restart2() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + String firstDStore = (String)vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false, + 100, 10, false, true, null, null, true )); + String secondDStore = (String)vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false, + 100, 10, false, true, null, null, true )); + + LogWriterUtils.getLogWriter().info("The first ds is " + firstDStore); + LogWriterUtils.getLogWriter().info("The first ds is " + secondDStore); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPersistentReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPersistentReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + LogWriterUtils.getLogWriter().info("Completed puts in the region"); + + // kill the vm + vm4.invoke(() -> WANTestBase.killSender()); + vm5.invoke(() -> WANTestBase.killSender()); + + LogWriterUtils.getLogWriter().info("Killed the sender. "); + // restart the vm + vm4.invoke(() -> WANTestBase.createCache( lnPort )); + vm5.invoke(() -> WANTestBase.createCache( lnPort )); + + vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false, + 100, 10, false, true, null, firstDStore, true )); + LogWriterUtils.getLogWriter().info("Created the sender.... in vm4 "); + vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false, + 100, 10, false, true, null, secondDStore, true )); + LogWriterUtils.getLogWriter().info("Created the sender.... in vm5 "); + + vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.startSender( "ln" )); + LogWriterUtils.getLogWriter().info("Started the sender in vm 4"); + + vm5.invoke(() -> WANTestBase.startSender( "ln" )); + LogWriterUtils.getLogWriter().info("Started the sender in vm 5"); + try { + inv1.join(); + } catch (InterruptedException e) { + fail("Got interrupted exception while waiting for startSender to finish."); + } + + Wait.pause(5000); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + + } +}