Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2D01A200B67 for ; Tue, 16 Aug 2016 23:34:08 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2B708160AA8; Tue, 16 Aug 2016 21:34:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F1A04160A74 for ; Tue, 16 Aug 2016 23:34:05 +0200 (CEST) Received: (qmail 5647 invoked by uid 500); 16 Aug 2016 21:34:05 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 5638 invoked by uid 99); 16 Aug 2016 21:34:05 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Aug 2016 21:34:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id A4A451A561E for ; Tue, 16 Aug 2016 21:34:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id iGGUdG8zkCSk for ; Tue, 16 Aug 2016 21:33:56 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 1EE8E60D5C for ; Tue, 16 Aug 2016 21:33:50 +0000 (UTC) Received: (qmail 3745 invoked by uid 99); 16 Aug 2016 21:33:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Aug 2016 21:33:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E962EEF423; Tue, 16 Aug 2016 21:33:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dschneider@apache.org To: commits@geode.incubator.apache.org Date: Tue, 16 Aug 2016 21:34:02 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [14/24] incubator-geode git commit: GEODE-1241: Fixed the misspelt names in Geode WAN module archived-at: Tue, 16 Aug 2016 21:34:08 -0000 http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f37769c/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropagation_2_DUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropagation_2_DUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropagation_2_DUnitTest.java new file mode 100644 index 0000000..cfe4169 --- /dev/null +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropagation_2_DUnitTest.java @@ -0,0 +1,448 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.concurrent; + +import org.junit.Ignore; +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.AsyncInvocation; +import com.gemstone.gemfire.test.dunit.IgnoredException; +import com.gemstone.gemfire.test.dunit.Wait; +import com.gemstone.gemfire.test.junit.categories.FlakyTest; + +/** + * All the test cases are similar to SerialWANPropagationDUnitTest except that + * the we create concurrent serial GatewaySender with concurrency of 4 + */ +@Category(DistributedTest.class) +public class ConcurrentWANPropagation_2_DUnitTest extends WANTestBase { + + public ConcurrentWANPropagation_2_DUnitTest() { + super(); + } + + private static final long serialVersionUID = 1L; + + @Test + public void testSerialReplicatedWanWithOverflow() { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + //keep the maxQueueMemory low enough to trigger eviction + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 10, 5, false, false, null, true, 5, OrderPolicy.KEY )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 10, 5, false, false, null, true, 5, OrderPolicy.KEY )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + vm2.invoke(() -> addListenerToSleepAfterCreateEvent(1000, getTestMethodName() + "_RR")); + vm3.invoke(() -> addListenerToSleepAfterCreateEvent(1000, getTestMethodName() + "_RR")); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doHeavyPuts( + getTestMethodName() + "_RR", 15 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 15, 240000)); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 15, 240000 )); + } + + @Ignore("Bug46921") + @Test + public void testSerialReplicatedWanWithPersistence() { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, true, null, true, 5, OrderPolicy.THREAD )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, true, null, true, 5, OrderPolicy.THREAD )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + + } + + @Test + public void testReplicatedSerialPropagationToTwoWanSites() throws Exception { + + Integer lnPort = createFirstLocatorWithDSId(1); + Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort )); + + createCacheInVMs(nyPort, vm2); + createCacheInVMs(tkPort, vm3); + vm2.invoke(() -> WANTestBase.createReceiver()); + vm3.invoke(() -> WANTestBase.createReceiver()); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "lnSerial1", + 2, false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "lnSerial1", + 2, false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "lnSerial2", + 3, false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "lnSerial2", + 3, false, 100, 10, false, false, null, true, 5 , OrderPolicy.THREAD)); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("lnSerial1", vm4, vm5); + + startSenderInVMs("lnSerial2", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + } + + @Test + public void testReplicatedSerialPropagationHA() throws Exception { + IgnoredException.addIgnoredException("Broken pipe"); + IgnoredException.addIgnoredException("Connection reset"); + IgnoredException.addIgnoredException("Unexpected IOException"); + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 10000 )); + Wait.pause(2000); + AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender()); + + inv1.join(); + inv2.join(); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 10000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 10000 )); + } + + @Test + public void testReplicatedSerialPropagationWithConflation() throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 1000, true, false, null, true, 5, OrderPolicy.THREAD )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 1000, true, false, null, true, 5, OrderPolicy.THREAD )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + } + + @Test + public void testReplicatedSerialPropagationWithParallelThreads() + throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, false, null, true, 4, OrderPolicy.THREAD )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, false, null, true, 4, OrderPolicy.THREAD )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doMultiThreadedPuts( + getTestMethodName() + "_RR", 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + } + + @Test + public void testSerialPropagationWithFilter() throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, false, + new MyGatewayEventFilter(), true, 5, OrderPolicy.THREAD )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, false, + new MyGatewayEventFilter(), true, 5, OrderPolicy.THREAD )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), null, 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName(), 800 )); + } + + @Test + public void testReplicatedSerialPropagationWithFilter() throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName(), null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName(), null, isOffHeap() )); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, false, + new MyGatewayEventFilter(), true, 5, OrderPolicy.THREAD )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, false, + new MyGatewayEventFilter(), true, 5, OrderPolicy.THREAD )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName(), "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName(), "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName(), "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName(), "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName(), 800 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName(), 800 )); + } + + @Test + public void testNormalRegionSerialPropagation() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + vm2.invoke(() -> WANTestBase.createCache(nyPort)); + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm2.invoke(() -> WANTestBase.createReceiver()); + + WANTestBase.createCacheInVMs(lnPort, vm4, vm5); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); + + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createNormalRegion( + getTestMethodName() + "_RR", "ln" )); + vm5.invoke(() -> WANTestBase.createNormalRegion( + getTestMethodName() + "_RR", "ln" )); + + vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + vm4.invoke(() -> WANTestBase.checkQueueStats( "ln", 0, + 0, 0, 0)); + + vm5.invoke(() -> WANTestBase.checkQueueStats( "ln", 0, + 1000, 0, 0 )); + + vm5.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000)); + + vm4.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 0)); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 0 )); + + vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, 0, 0)); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f37769c/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_1_DUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_1_DUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_1_DUnitTest.java deleted file mode 100644 index c01cb22..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_1_DUnitTest.java +++ /dev/null @@ -1,568 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.concurrent; - -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import com.gemstone.gemfire.cache.CacheException; -import com.gemstone.gemfire.cache.EntryExistsException; -import com.gemstone.gemfire.cache.client.ServerOperationException; -import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; -import com.gemstone.gemfire.cache30.CacheSerializableRunnable; -import com.gemstone.gemfire.distributed.internal.DistributionConfig; -import com.gemstone.gemfire.internal.cache.wan.BatchException70; -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.test.dunit.AsyncInvocation; -import com.gemstone.gemfire.test.dunit.IgnoredException; -import com.gemstone.gemfire.test.dunit.LogWriterUtils; - -/** - * All the test cases are similar to SerialWANPropogationDUnitTest except that - * the we create concurrent serial GatewaySender with concurrency of 4 - * - */ -@Category(DistributedTest.class) -public class ConcurrentWANPropogation_1_DUnitTest extends WANTestBase { - - /** - * @param name - */ - public ConcurrentWANPropogation_1_DUnitTest() { - super(); - } - - private static final long serialVersionUID = 1L; - - /** - * All the test cases are similar to SerialWANPropogationDUnitTest - * @throws Exception - */ - @Test - public void testReplicatedSerialPropagation_withoutRemoteSite() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - //keep the batch size high enough to reduce the number of exceptions in the log - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 400, false, false, null, true, 4, OrderPolicy.KEY )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 400, false, false, null, true, 4, OrderPolicy.KEY )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - IgnoredException.addIgnoredException(BatchException70.class.getName()); - IgnoredException.addIgnoredException(ServerOperationException.class.getName()); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - createCacheInVMs(nyPort, vm2, vm3); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - vm2.invoke(() -> WANTestBase.createReceiver()); - vm3.invoke(() -> WANTestBase.createReceiver()); - - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - } - - @Test - public void testReplicatedSerialPropagation() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - } - - - @Test - public void testReplicatedSerialPropagationWithLocalSiteClosedAndRebuilt() throws Exception { - IgnoredException.addIgnoredException("Broken pipe"); - IgnoredException.addIgnoredException("Connection reset"); - IgnoredException.addIgnoredException("Unexpected IOException"); - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - //---------close local site and build again----------------------------------------- - vm4.invoke(() -> WANTestBase.killSender( )); - vm5.invoke(() -> WANTestBase.killSender( )); - vm6.invoke(() -> WANTestBase.killSender( )); - vm7.invoke(() -> WANTestBase.killSender( )); - - Integer regionSize = - (Integer) vm2.invoke(() -> WANTestBase.getRegionSize(getTestMethodName() + "_RR" )); - LogWriterUtils.getLogWriter().info("Region size on remote is: " + regionSize); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - IgnoredException.addIgnoredException(EntryExistsException.class.getName()); - IgnoredException.addIgnoredException(BatchException70.class.getName()); - IgnoredException.addIgnoredException(ServerOperationException.class.getName()); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - //---------------------------------------------------------------------------------- - - //verify remote site receives all the events - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - } - - /** - * Two regions configured with the same sender and put is in progress - * on both the regions. - * One of the two regions is destroyed in the middle. - * - * @throws Exception - */ - @Test - public void testReplicatedSerialPropagationWithLocalRegionDestroy() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - //these are part of remote site - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - //these are part of local site - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - //senders are created on local site - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 20, false, false, null, true, 3, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 20, false, false, null, true ,3, OrderPolicy.THREAD)); - - //create one RR (RR_1) on remote site - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - - //create another RR (RR_2) on remote site - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", null, isOffHeap() )); - - //start the senders on local site - startSenderInVMs("ln", vm4, vm5); - - //create one RR (RR_1) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - - //create another RR (RR_2) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - - //start puts in RR_1 in another thread - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 1000 )); - //do puts in RR_2 in main thread - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_2", 500 )); - //destroy RR_2 after above puts are complete - vm4.invoke(() -> WANTestBase.destroyRegion( getTestMethodName() + "_RR_2")); - - try { - inv1.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - fail(); - } - //sleep for some time to let all the events propagate to remote site - Thread.sleep(20); - //vm4.invoke(() -> WANTestBase.verifyQueueSize( "ln", 0 )); - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR_1", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR_2", 500 )); - } - - /** - * 1 region and sender configured on local site and 1 region and a - * receiver configured on remote site. Puts to the local region are in progress. - * Remote region is destroyed in the middle. - * - * @throws Exception - */ - @Test - public void testReplicatedSerialPropagationWithRemoteRegionDestroy() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - //these are part of remote site - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - - //these are part of local site - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - //senders are created on local site - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 500, false, false, null, true, 5, OrderPolicy.KEY )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 500, false, false, null, true, 5, OrderPolicy.KEY )); - - //create one RR (RR_1) on remote site - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - - //start the senders on local site - startSenderInVMs("ln", vm4, vm5); - - //create one RR (RR_1) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - - IgnoredException.addIgnoredException(BatchException70.class.getName()); - IgnoredException.addIgnoredException(ServerOperationException.class.getName()); - - //start puts in RR_1 in another thread - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 10000 )); - //destroy RR_1 in remote site - vm2.invoke(() -> WANTestBase.destroyRegion( getTestMethodName() + "_RR_1")); - - try { - inv1.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - fail(); - } - - //verify that all is well in local site. All the events should be present in local region - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR_1", 10000 )); - //assuming some events might have been dispatched before the remote region was destroyed, - //sender's region queue will have events less than 1000 but the queue will not be empty. - //NOTE: this much verification might be sufficient in DUnit. Hydra will take care of - //more in depth validations. - vm4.invoke(() -> WANTestBase.verifyRegionQueueNotEmptyForConcurrentSender("ln" )); - } - - /** - * Two regions configured in local with the same sender and put is in progress - * on both the regions. Same two regions are configured on remote site as well. - * One of the two regions is destroyed in the middle on remote site. - * - * @throws Exception - */ - @Test - public void testReplicatedSerialPropagationWithRemoteRegionDestroy2() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - //these are part of remote site - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - //these are part of local site - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - //senders are created on local site - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 200, false, false, null, true, 5, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 200, false, false, null, true, 5, OrderPolicy.THREAD )); - - //create one RR (RR_1) on remote site - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - - //create another RR (RR_2) on remote site - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", null, isOffHeap() )); - - //start the senders on local site - startSenderInVMs("ln", vm4, vm5); - - //create one RR (RR_1) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - - //create another RR (RR_2) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - //destroy RR_2 on remote site in the middle - vm2.invoke(() -> WANTestBase.destroyRegion( getTestMethodName() + "_RR_2")); - - //expected exceptions in the logs - IgnoredException.addIgnoredException(BatchException70.class.getName()); - IgnoredException.addIgnoredException(ServerOperationException.class.getName()); - - //start puts in RR_2 in another thread - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_2", 1000 )); - - //start puts in RR_1 in another thread - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 1000 )); - - try { - inv1.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - fail(); - } - //though region RR_2 is destroyed, RR_1 should still get all the events put in it - //in local site - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR_1", 1000 )); - - } - - @Test - public void testReplicatedSerialPropagationWithRemoteRegionDestroy3() - throws Exception { - final String senderId = "ln"; - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - // these are part of remote site - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - // these are part of local site - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - // senders are created on local site - vm4.invoke(() -> WANTestBase.createConcurrentSender( senderId, 2, - false, 100, 200, false, false, null, true, 5, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( senderId, 2, - false, 100, 200, false, false, null, true, 5, OrderPolicy.THREAD )); - - // create one RR (RR_1) on remote site - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - - // create another RR (RR_2) on remote site - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", null, isOffHeap() )); - - - // start the senders on local site - startSenderInVMs("ln", vm4, vm5); - - // create one RR (RR_1) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", senderId, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", senderId, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", senderId, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", senderId, isOffHeap() )); - - // create another RR (RR_2) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", senderId, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", senderId, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", senderId, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", senderId, isOffHeap() )); - - IgnoredException.addIgnoredException(BatchException70.class.getName()); - IgnoredException.addIgnoredException(ServerOperationException.class.getName()); - - // start puts in RR_1 in another thread - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 1000 )); - // start puts in RR_2 in another thread - AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_2", 1000 )); - // destroy RR_2 on remote site in the middle - vm2.invoke(() -> WANTestBase.destroyRegion( getTestMethodName() - + "_RR_2" )); - - try { - inv1.join(); - inv2.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - fail(); - } - // though region RR_2 is destroyed, RR_1 should still get all the events put - // in it - // in local site - try { - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR_1", 1000 )); - } finally { - System.setProperty( - DistributionConfig.GEMFIRE_PREFIX + "GatewaySender.REMOVE_FROM_QUEUE_ON_EXCEPTION", "False"); - vm4.invoke(new CacheSerializableRunnable("UnSetting system property ") { - public void run2() throws CacheException { - System.setProperty( - DistributionConfig.GEMFIRE_PREFIX + "GatewaySender.REMOVE_FROM_QUEUE_ON_EXCEPTION", "False"); - } - }); - - vm5.invoke(new CacheSerializableRunnable("UnSetting system property ") { - public void run2() throws CacheException { - System.setProperty( - DistributionConfig.GEMFIRE_PREFIX + "GatewaySender.REMOVE_FROM_QUEUE_ON_EXCEPTION", "False"); - } - }); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f37769c/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_2_DUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_2_DUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_2_DUnitTest.java deleted file mode 100644 index 06e0ecb..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_2_DUnitTest.java +++ /dev/null @@ -1,448 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.concurrent; - -import org.junit.Ignore; -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.test.dunit.AsyncInvocation; -import com.gemstone.gemfire.test.dunit.IgnoredException; -import com.gemstone.gemfire.test.dunit.Wait; -import com.gemstone.gemfire.test.junit.categories.FlakyTest; - -/** - * All the test cases are similar to SerialWANPropogationDUnitTest except that - * the we create concurrent serial GatewaySender with concurrency of 4 - */ -@Category(DistributedTest.class) -public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase { - - public ConcurrentWANPropogation_2_DUnitTest() { - super(); - } - - private static final long serialVersionUID = 1L; - - @Test - public void testSerialReplicatedWanWithOverflow() { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - //keep the maxQueueMemory low enough to trigger eviction - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 10, 5, false, false, null, true, 5, OrderPolicy.KEY )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 10, 5, false, false, null, true, 5, OrderPolicy.KEY )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - vm2.invoke(() -> addListenerToSleepAfterCreateEvent(1000, getTestMethodName() + "_RR")); - vm3.invoke(() -> addListenerToSleepAfterCreateEvent(1000, getTestMethodName() + "_RR")); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doHeavyPuts( - getTestMethodName() + "_RR", 15 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 15, 240000)); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 15, 240000 )); - } - - @Ignore("Bug46921") - @Test - public void testSerialReplicatedWanWithPersistence() { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, true, null, true, 5, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, true, null, true, 5, OrderPolicy.THREAD )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - - } - - @Test - public void testReplicatedSerialPropagationToTwoWanSites() throws Exception { - - Integer lnPort = createFirstLocatorWithDSId(1); - Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort )); - - createCacheInVMs(nyPort, vm2); - createCacheInVMs(tkPort, vm3); - vm2.invoke(() -> WANTestBase.createReceiver()); - vm3.invoke(() -> WANTestBase.createReceiver()); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "lnSerial1", - 2, false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "lnSerial1", - 2, false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "lnSerial2", - 3, false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "lnSerial2", - 3, false, 100, 10, false, false, null, true, 5 , OrderPolicy.THREAD)); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("lnSerial1", vm4, vm5); - - startSenderInVMs("lnSerial2", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - } - - @Test - public void testReplicatedSerialPropagationHA() throws Exception { - IgnoredException.addIgnoredException("Broken pipe"); - IgnoredException.addIgnoredException("Connection reset"); - IgnoredException.addIgnoredException("Unexpected IOException"); - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 10000 )); - Wait.pause(2000); - AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender()); - - inv1.join(); - inv2.join(); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 10000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 10000 )); - } - - @Test - public void testReplicatedSerialPropagationWithConflation() throws Exception { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 1000, true, false, null, true, 5, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 1000, true, false, null, true, 5, OrderPolicy.THREAD )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - } - - @Test - public void testReplicatedSerialPropagationWithParallelThreads() - throws Exception { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, null, true, 4, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, null, true, 4, OrderPolicy.THREAD )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doMultiThreadedPuts( - getTestMethodName() + "_RR", 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - } - - @Test - public void testSerialPropogationWithFilter() throws Exception { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, - new MyGatewayEventFilter(), true, 5, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, - new MyGatewayEventFilter(), true, 5, OrderPolicy.THREAD )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), null, 1, 100, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName(), 800 )); - } - - @Test - public void testReplicatedSerialPropagationWithFilter() throws Exception { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName(), null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName(), null, isOffHeap() )); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, - new MyGatewayEventFilter(), true, 5, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, - new MyGatewayEventFilter(), true, 5, OrderPolicy.THREAD )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName(), "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName(), "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName(), "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName(), "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName(), 800 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName(), 800 )); - } - - @Test - public void testNormalRegionSerialPropagation() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - vm2.invoke(() -> WANTestBase.createCache(nyPort)); - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm2.invoke(() -> WANTestBase.createReceiver()); - - WANTestBase.createCacheInVMs(lnPort, vm4, vm5); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createNormalRegion( - getTestMethodName() + "_RR", "ln" )); - vm5.invoke(() -> WANTestBase.createNormalRegion( - getTestMethodName() + "_RR", "ln" )); - - vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - vm4.invoke(() -> WANTestBase.checkQueueStats( "ln", 0, - 0, 0, 0)); - - vm5.invoke(() -> WANTestBase.checkQueueStats( "ln", 0, - 1000, 0, 0 )); - - vm5.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000)); - - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 0)); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 0 )); - - vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, 0, 0)); - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f37769c/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java index 6ad485f..9c6cbdd 100644 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java @@ -60,7 +60,7 @@ public class CommonParallelGatewaySenderDUnitTest extends WANTestBase { try { vm4.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() )); - fail("Expected IllegateStateException : cannot have the same parallel gateway sender"); + fail("Expected IllegalStateException : cannot have the same parallel gateway sender"); } catch (Exception e) { if (!(e.getCause() instanceof IllegalStateException) @@ -76,7 +76,7 @@ public class CommonParallelGatewaySenderDUnitTest extends WANTestBase { * @throws Exception * Below test is disabled intentionally 1> In this release 8.0, for rolling upgrade support queue name is changed to old style - 2>Comman parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about ParallelGatewaySenderQueue#convertPathToName 3> We have to enabled it in next release 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 @@ -158,7 +158,7 @@ public class CommonParallelGatewaySenderDUnitTest extends WANTestBase { * The PGS is persistence enabled but not the Regions * Below test is disabled intentionally 1> In this release 8.0, for rolling upgrade support queue name is changed to old style - 2>Comman parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about ParallelGatewaySenderQueue#convertPathToName 3> We have to enabled it in next release 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 @@ -245,7 +245,7 @@ public class CommonParallelGatewaySenderDUnitTest extends WANTestBase { * Check if the remote site receives all the events. * Below test is disabled intentionally 1> In this release 8.0, for rolling upgrade support queue name is changed to old style - 2>Comman parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about ParallelGatewaySenderQueue#convertPathToName 3> We have to enabled it in next release 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f37769c/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java index b396e85..8b8c624 100644 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java @@ -138,7 +138,7 @@ public class NewWANConcurrencyCheckForDestroyDUnitTest extends WANTestBase { //wait for vm1 to propagate destroyed entry's new version tag to vm5 Wait.pause(2000); - vm5.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.verifyTimestampAfterOp(destroyTimeStamp, 1 /* ds 3 receives gatway event only from ds 1*/)); + vm5.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.verifyTimestampAfterOp(destroyTimeStamp, 1 /* ds 3 receives gateway event only from ds 1*/)); } /** @@ -362,7 +362,7 @@ public class NewWANConcurrencyCheckForDestroyDUnitTest extends WANTestBase { * version tag is generated in local distributed system. */ @Test - public void testConflicChecksBasedOnDsidAndTimeStamp() { + public void testConflictChecksBasedOnDsidAndTimeStamp() { // create two distributed systems with each having a cache containing http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f37769c/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/PDXNewWanDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/PDXNewWanDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/PDXNewWanDUnitTest.java index 0f76ce8..17c76ed 100644 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/PDXNewWanDUnitTest.java +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/PDXNewWanDUnitTest.java @@ -90,7 +90,7 @@ public class PDXNewWanDUnitTest extends WANTestBase{ * and can deserialize entries. */ @Test - public void testWANPDX_RemoveRomoteData() { + public void testWANPDX_RemoveRemoteData() { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f37769c/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPersistenceDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPersistenceDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPersistenceDUnitTest.java index e2810f1..1981feb 100644 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPersistenceDUnitTest.java +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPersistenceDUnitTest.java @@ -22,8 +22,6 @@ import org.junit.Test; import static org.junit.Assert.*; -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; import com.gemstone.gemfire.test.junit.categories.DistributedTest; import com.gemstone.gemfire.cache.DataPolicy; @@ -45,7 +43,7 @@ public class ReplicatedRegion_ParallelWANPersistenceDUnitTest extends WANTestBas /**Below test is disabled intentionally 1> In this release 8.0, for rolling upgrade support queue name is changed to old style - 2>Comman parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about ParallelGatewaySenderQueue#convertPathToName 3> We have to enabled it in next release 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 @@ -185,7 +183,7 @@ public class ReplicatedRegion_ParallelWANPersistenceDUnitTest extends WANTestBas /**Below test is disabled intentionally 1> In this release 8.0, for rolling upgrade support queue name is changed to old style - 2>Comman parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about ParallelGatewaySenderQueue#convertPathToName 3> We have to enabled it in next release 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 @@ -338,7 +336,7 @@ public class ReplicatedRegion_ParallelWANPersistenceDUnitTest extends WANTestBas /**Below test is disabled intentionally 1> In this release 8.0, for rolling upgrade support queue name is changed to old style - 2>Comman parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about ParallelGatewaySenderQueue#convertPathToName 3> We have to enabled it in next release 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 @@ -554,7 +552,7 @@ public class ReplicatedRegion_ParallelWANPersistenceDUnitTest extends WANTestBas /**Below test is disabled intentionally 1> In this release 8.0, for rolling upgrade support queue name is changed to old style - 2>Comman parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about ParallelGatewaySenderQueue#convertPathToName 3> We have to enabled it in next release 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 @@ -613,11 +611,11 @@ public class ReplicatedRegion_ParallelWANPersistenceDUnitTest extends WANTestBas Thread.sleep(60000); { - AsyncInvocation inv1 = vm7.invokeAsync(() -> ReplicatedRegion_ParallelWANPropogationDUnitTest.doPuts0( getTestMethodName() + "_RR", 10000 )); + AsyncInvocation inv1 = vm7.invokeAsync(() -> ReplicatedRegion_ParallelWANPropagationDUnitTest.doPuts0(getTestMethodName() + "_RR", 10000 )); Thread.sleep(1000); AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender()); Thread.sleep(2000); - AsyncInvocation inv3 = vm6.invokeAsync(() -> ReplicatedRegion_ParallelWANPropogationDUnitTest.doPuts1( getTestMethodName() + "_RR", 10000 )); + AsyncInvocation inv3 = vm6.invokeAsync(() -> ReplicatedRegion_ParallelWANPropagationDUnitTest.doPuts1(getTestMethodName() + "_RR", 10000 )); Thread.sleep(1500); AsyncInvocation inv4 = vm5.invokeAsync(() -> WANTestBase.killSender()); try { @@ -647,7 +645,7 @@ public class ReplicatedRegion_ParallelWANPersistenceDUnitTest extends WANTestBas DataPolicy.PERSISTENT_REPLICATE, isOffHeap() )); AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "ln", Scope.DISTRIBUTED_ACK, DataPolicy.PERSISTENT_REPLICATE, isOffHeap() )); - AsyncInvocation inv3 = vm6.invokeAsync(() -> ReplicatedRegion_ParallelWANPropogationDUnitTest.doPuts2( getTestMethodName() + "_RR", 15000 )); + AsyncInvocation inv3 = vm6.invokeAsync(() -> ReplicatedRegion_ParallelWANPropagationDUnitTest.doPuts2(getTestMethodName() + "_RR", 15000 )); try { inv1.join(); inv2.join();