Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A1248200B92 for ; Wed, 14 Sep 2016 00:44:18 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9FADD160AD3; Tue, 13 Sep 2016 22:44:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2C9E0160AD8 for ; Wed, 14 Sep 2016 00:44:16 +0200 (CEST) Received: (qmail 31415 invoked by uid 500); 13 Sep 2016 22:44:15 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 31406 invoked by uid 99); 13 Sep 2016 22:44:15 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Sep 2016 22:44:15 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id AFD14CEF43 for ; Tue, 13 Sep 2016 22:44:14 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.344 X-Spam-Level: X-Spam-Status: No, score=-4.344 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.124] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id GnKHS1TMEerR for ; Tue, 13 Sep 2016 22:44:08 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 3C3EA60E31 for ; Tue, 13 Sep 2016 22:43:50 +0000 (UTC) Received: (qmail 28296 invoked by uid 99); 13 Sep 2016 22:43:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Sep 2016 22:43:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 13159E3607; Tue, 13 Sep 2016 22:43:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hiteshkhamesra@apache.org To: commits@geode.incubator.apache.org Date: Tue, 13 Sep 2016 22:43:59 -0000 Message-Id: In-Reply-To: <69ace8383aae47d1b0c2dc7793e01b2a@git.apache.org> References: <69ace8383aae47d1b0c2dc7793e01b2a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [11/61] [abbrv] incubator-geode git commit: GEODE-37 change package name from com.gemstone.gemfire (for ./geode-wan/src/test/java/com/gemstone/gemfire)to org.apache.geode for(to ./geode-wan/src/test/java/org/apache/geode) archived-at: Tue, 13 Sep 2016 22:44:18 -0000 http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/offheap/SerialWANPersistenceEnabledGatewaySenderOffHeapDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/offheap/SerialWANPersistenceEnabledGatewaySenderOffHeapDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/offheap/SerialWANPersistenceEnabledGatewaySenderOffHeapDUnitTest.java new file mode 100644 index 0000000..5e32bb6 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/offheap/SerialWANPersistenceEnabledGatewaySenderOffHeapDUnitTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.offheap; + +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import com.gemstone.gemfire.internal.cache.wan.serial.SerialWANPersistenceEnabledGatewaySenderDUnitTest; + +@SuppressWarnings("serial") +@Category(DistributedTest.class) +public class SerialWANPersistenceEnabledGatewaySenderOffHeapDUnitTest extends + SerialWANPersistenceEnabledGatewaySenderDUnitTest { + + public SerialWANPersistenceEnabledGatewaySenderOffHeapDUnitTest() { + super(); + } + + @Override + public boolean isOffHeap() { + return true; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/offheap/SerialWANPropagationOffHeapDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/offheap/SerialWANPropagationOffHeapDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/offheap/SerialWANPropagationOffHeapDUnitTest.java new file mode 100644 index 0000000..d6cecd1 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/offheap/SerialWANPropagationOffHeapDUnitTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.offheap; + +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import com.gemstone.gemfire.internal.cache.wan.serial.SerialWANPropagationDUnitTest; + +@SuppressWarnings("serial") +@Category(DistributedTest.class) +public class SerialWANPropagationOffHeapDUnitTest extends SerialWANPropagationDUnitTest { + + public SerialWANPropagationOffHeapDUnitTest() { + super(); + } + + @Override + public boolean isOffHeap() { + return true; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/offheap/SerialWANPropagation_PartitionedRegionOffHeapDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/offheap/SerialWANPropagation_PartitionedRegionOffHeapDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/offheap/SerialWANPropagation_PartitionedRegionOffHeapDUnitTest.java new file mode 100644 index 0000000..4abd6e6 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/offheap/SerialWANPropagation_PartitionedRegionOffHeapDUnitTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.offheap; + +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import com.gemstone.gemfire.internal.cache.wan.serial.SerialWANPropagation_PartitionedRegionDUnitTest; + +@SuppressWarnings("serial") +@Category(DistributedTest.class) +public class SerialWANPropagation_PartitionedRegionOffHeapDUnitTest + extends SerialWANPropagation_PartitionedRegionDUnitTest { + + public SerialWANPropagation_PartitionedRegionOffHeapDUnitTest() { + super(); + } + + @Override + public boolean isOffHeap() { + return true; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperation_2_DUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperation_2_DUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperation_2_DUnitTest.java new file mode 100644 index 0000000..e0f29db --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperation_2_DUnitTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.parallel; + +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import com.gemstone.gemfire.internal.cache.wan.concurrent.ConcurrentParallelGatewaySenderOperation_2_DUnitTest; +import com.gemstone.gemfire.test.dunit.VM; + +@Category(DistributedTest.class) +public class ParallelGatewaySenderOperation_2_DUnitTest extends ConcurrentParallelGatewaySenderOperation_2_DUnitTest { + + private static final long serialVersionUID = 1L; + + public ParallelGatewaySenderOperation_2_DUnitTest() { + super(); + } + + protected void createSender(VM vm, int concurrencyLevel, boolean manualStart) { + vm.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, manualStart)); + } + + protected void createSenders(VM vm, int concurrencyLevel) { + vm.invoke(() -> createSender("ln1", 2, true, 100, 10, false, false, null, true)); + vm.invoke(() -> createSender("ln2", 3, true, 100, 10, false, false, null, true)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java new file mode 100644 index 0000000..4981c97 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java @@ -0,0 +1,692 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.parallel; + +import static com.gemstone.gemfire.test.dunit.Assert.*; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.GemFireIOException; +import com.gemstone.gemfire.distributed.internal.DistributionConfig; +import com.gemstone.gemfire.internal.cache.tier.sockets.Message; +import com.gemstone.gemfire.internal.cache.tier.sockets.MessageTooLargeException; +import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.AsyncInvocation; +import com.gemstone.gemfire.test.dunit.IgnoredException; +import com.gemstone.gemfire.test.dunit.LogWriterUtils; +import com.gemstone.gemfire.test.dunit.RMIException; +import com.gemstone.gemfire.test.dunit.Wait; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; +import com.gemstone.gemfire.test.junit.categories.FlakyTest; + +/** + * DUnit test for operations on ParallelGatewaySender + */ +@Category(DistributedTest.class) +public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { + + @Override + protected final void postSetUpWANTestBase() throws Exception { + IgnoredException.addIgnoredException("Broken pipe||Unexpected IOException"); + } + + @Test + public void testParallelGatewaySenderWithoutStarting() { + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + + createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, false); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + + vm4.invoke(() -> WANTestBase.verifySenderStoppedState( "ln" )); + vm5.invoke(() -> WANTestBase.verifySenderStoppedState( "ln" )); + vm6.invoke(() -> WANTestBase.verifySenderStoppedState( "ln" )); + vm7.invoke(() -> WANTestBase.verifySenderStoppedState( "ln" )); + + validateRegionSizes(getTestMethodName() + "_PR", 0, vm2, vm3); + } + + /** + * Defect 44323 (ParallelGatewaySender should not be started on Accessor node) + */ + @Test + public void testParallelGatewaySenderStartOnAccessorNode() { + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + + createSendersReceiversAndPartitionedRegion(lnPort, nyPort, true, true); + + Wait.pause(2000); + + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 10 )); + + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + + validateRegionSizes(getTestMethodName() + "_PR", 10, vm2, vm3); + } + + /** + * Normal scenario in which the sender is paused in between. + */ + @Test + public void testParallelPropagationSenderPause() throws Exception { + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + + createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true); + + //make sure all the senders are running before doing any puts + waitForSendersRunning(); + + //FIRST RUN: now, the senders are started. So, start the puts + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 )); + + //now, pause all of the senders + vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm7.invoke(() -> WANTestBase.pauseSender( "ln" )); + + //SECOND RUN: keep one thread doing puts to the region + vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + + //verify region size remains on remote vm and is restricted below a specified limit (i.e. number of puts in the first run) + vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 100 )); + } + + /** + * Normal scenario in which a paused sender is resumed. + */ + @Test + public void testParallelPropagationSenderResume() throws Exception { + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + + createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true); + + //make sure all the senders are running before doing any puts + waitForSendersRunning(); + + //now, the senders are started. So, start the puts + vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + + //now, pause all of the senders + vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm7.invoke(() -> WANTestBase.pauseSender( "ln" )); + + //sleep for a second or two + Wait.pause(2000); + + //resume the senders + vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm6.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm7.invoke(() -> WANTestBase.resumeSender( "ln" )); + + Wait.pause(2000); + + validateParallelSenderQueueAllBucketsDrained(); + + //find the region size on remote vm + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000 )); + } + + /** + * Negative scenario in which a sender that is stopped (and not paused) is resumed. + * Expected: resume is only valid for pause. If a sender which is stopped is resumed, + * it will not be started again. + */ + @Test + public void testParallelPropagationSenderResumeNegativeScenario() throws Exception { + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + //wait till the senders are running + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + //start the puts + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 )); + + //let the queue drain completely + vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", 0 )); + + //stop the senders + vm4.invoke(() -> WANTestBase.stopSender( "ln" )); + vm5.invoke(() -> WANTestBase.stopSender( "ln" )); + + //now, try to resume a stopped sender + vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); + + //do more puts + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + + //validate region size on remote vm to contain only the events put in local site + //before the senders are stopped. + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100 )); + } + + /** + * Normal scenario in which a sender is stopped. + */ + @Test + public void testParallelPropagationSenderStop() throws Exception { + IgnoredException.addIgnoredException("Broken pipe"); + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + + createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true); + + //make sure all the senders are running before doing any puts + waitForSendersRunning(); + + //FIRST RUN: now, the senders are started. So, do some of the puts + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 )); + + //now, stop all of the senders + stopSenders(); + + //SECOND RUN: keep one thread doing puts + vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + + //verify region size remains on remote vm and is restricted below a specified limit (number of puts in the first run) + vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 100 )); + } + + /** + * Normal scenario in which a sender is stopped and then started again. + */ + @Test + public void testParallelPropagationSenderStartAfterStop() throws Exception { + IgnoredException.addIgnoredException("Broken pipe"); + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + String regionName = getTestMethodName() + "_PR"; + + + createCacheInVMs(nyPort, vm2, vm3); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm2.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); + vm3.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); + + createReceiverInVMs(vm2, vm3); + + vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); + vm5.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); + vm6.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); + vm7.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); + + vm4.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true)); + vm5.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true)); + vm6.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true)); + vm7.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true)); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + //make sure all the senders are running before doing any puts + vm4.invoke(() -> waitForSenderRunningState("ln")); + vm5.invoke(() -> waitForSenderRunningState("ln")); + vm6.invoke(() -> waitForSenderRunningState("ln")); + vm7.invoke(() -> waitForSenderRunningState("ln")); + + //FIRST RUN: now, the senders are started. So, do some of the puts + vm4.invoke(() -> WANTestBase.doPuts( regionName, 200 )); + + //now, stop all of the senders + vm4.invoke(() -> stopSender("ln")); + vm5.invoke(() -> stopSender("ln")); + vm6.invoke(() -> stopSender("ln")); + vm7.invoke(() -> stopSender("ln")); + + //Region size on remote site should remain same and below the number of puts done in the FIRST RUN + vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(regionName, 200 )); + + //SECOND RUN: do some of the puts after the senders are stopped + vm4.invoke(() -> WANTestBase.doPuts( regionName, 1000 )); + + //Region size on remote site should remain same and below the number of puts done in the FIRST RUN + vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(regionName, 200 )); + + //start the senders again + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm4.invoke(() -> waitForSenderRunningState("ln")); + vm5.invoke(() -> waitForSenderRunningState("ln")); + vm6.invoke(() -> waitForSenderRunningState("ln")); + vm7.invoke(() -> waitForSenderRunningState("ln")); + + //Region size on remote site should remain same and below the number of puts done in the FIRST RUN + vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(regionName, 200 )); + + //SECOND RUN: do some more puts + AsyncInvocation async = vm4.invokeAsync(() -> WANTestBase.doPuts( regionName, 1000 )); + async.join(); + + //verify all the buckets on all the sender nodes are drained + validateParallelSenderQueueAllBucketsDrained(); + + //verify the events propagate to remote site + vm2.invoke(() -> WANTestBase.validateRegionSize(regionName, 1000 )); + + vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 )); + vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 )); + vm6.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 )); + vm7.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 )); + } + + /** + * Normal scenario in which a sender is stopped and then started again. + * Differs from above test case in the way that when the sender is starting from + * stopped state, puts are simultaneously happening on the region by another thread. + */ + @Test + public void testParallelPropagationSenderStartAfterStop_Scenario2() throws Exception { + IgnoredException.addIgnoredException("Broken pipe"); + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + + createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true); + + //make sure all the senders are running before doing any puts + waitForSendersRunning(); + + LogWriterUtils.getLogWriter().info("All the senders are now started"); + + //FIRST RUN: now, the senders are started. So, do some of the puts + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 200 )); + + LogWriterUtils.getLogWriter().info("Done few puts"); + + //Make sure the puts make it to the remote side + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 200, 120000)); + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 200, 120000)); + + //now, stop all of the senders + stopSenders(); + + LogWriterUtils.getLogWriter().info("All the senders are stopped"); + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 200, 120000)); + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 200, 120000)); + + //SECOND RUN: do some of the puts after the senders are stopped + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + LogWriterUtils.getLogWriter().info("Done some more puts in second run"); + + //Region size on remote site should remain same and below the number of puts done in the FIRST RUN + vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200 )); + + //SECOND RUN: start async puts on region + AsyncInvocation async = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 5000 )); + LogWriterUtils.getLogWriter().info("Started high number of puts by async thread"); + + LogWriterUtils.getLogWriter().info("Starting the senders at the same time"); + //when puts are happening by another thread, start the senders + startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7); + + LogWriterUtils.getLogWriter().info("All the senders are started"); + + async.join(); + + //verify all the buckets on all the sender nodes are drained + validateParallelSenderQueueAllBucketsDrained(); + + //verify that the queue size ultimately becomes zero. That means all the events propagate to remote site. + vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", 0 )); + } + + /** + * Normal scenario in which a sender is stopped and then started again on accessor node. + */ + @Test + public void testParallelPropagationSenderStartAfterStopOnAccessorNode() throws Exception { + IgnoredException.addIgnoredException("Broken pipe"); + IgnoredException.addIgnoredException("Connection reset"); + IgnoredException.addIgnoredException("Unexpected IOException"); + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + + createSendersReceiversAndPartitionedRegion(lnPort, nyPort, true, true); + + //make sure all the senders are not running on accessor nodes and running on non-accessor nodes + waitForSendersRunning(); + + //FIRST RUN: now, the senders are started. So, do some of the puts + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 200 )); + + //now, stop all of the senders + stopSenders(); + + Wait.pause(2000); + + //SECOND RUN: do some of the puts after the senders are stopped + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + + //Region size on remote site should remain same and below the number of puts done in the FIRST RUN + vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200 )); + + //start the senders again + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + //Region size on remote site should remain same and below the number of puts done in the FIRST RUN + vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200 )); + + //SECOND RUN: do some more puts + AsyncInvocation async = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + async.join(); + Wait.pause(5000); + + //verify all buckets drained only on non-accessor nodes. + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + + //verify the events propagate to remote site + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000 )); + } + + /** + * Normal scenario in which a combinations of start, pause, resume operations + * is tested + */ + @Test + public void testStartPauseResumeParallelGatewaySender() throws Exception { + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + + createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + LogWriterUtils.getLogWriter().info("Done 1000 puts on local site"); + + //Since puts are already done on userPR, it will have the buckets created. + //During sender start, it will wait until those buckets are created for shadowPR as well. + //Start the senders in async threads, so colocation of shadowPR will be complete and + //missing buckets will be created in PRHARedundancyProvider.createMissingBuckets(). + startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7); + + waitForSendersRunning(); + + LogWriterUtils.getLogWriter().info("Started senders on local site"); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 5000 )); + LogWriterUtils.getLogWriter().info("Done 5000 puts on local site"); + + vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm7.invoke(() -> WANTestBase.pauseSender( "ln" )); + LogWriterUtils.getLogWriter().info("Paused senders on local site"); + + vm4.invoke(() -> WANTestBase.verifySenderPausedState( "ln" )); + vm5.invoke(() -> WANTestBase.verifySenderPausedState( "ln" )); + vm6.invoke(() -> WANTestBase.verifySenderPausedState( "ln" )); + vm7.invoke(() -> WANTestBase.verifySenderPausedState( "ln" )); + + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + LogWriterUtils.getLogWriter().info("Started 1000 async puts on local site"); + + vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm6.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm7.invoke(() -> WANTestBase.resumeSender( "ln" )); + LogWriterUtils.getLogWriter().info("Resumed senders on local site"); + + vm4.invoke(() -> WANTestBase.verifySenderResumedState( "ln" )); + vm5.invoke(() -> WANTestBase.verifySenderResumedState( "ln" )); + vm6.invoke(() -> WANTestBase.verifySenderResumedState( "ln" )); + vm7.invoke(() -> WANTestBase.verifySenderResumedState( "ln" )); + + try { + inv1.join(); + } catch (InterruptedException e) { + fail("Interrupted the async invocation.", e); + } + + //verify all buckets drained on all sender nodes. + validateParallelSenderQueueAllBucketsDrained(); + + validateRegionSizes(getTestMethodName() + "_PR", 5000, vm2, vm3); + } + + /** + * Since the sender is attached to a region and in use, it can not be + * destroyed. Hence, exception is thrown by the sender API. + */ + @Test + public void testDestroyParallelGatewaySenderExceptionScenario() { + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + + createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true); + + // make sure all the senders are running before doing any puts + waitForSendersRunning(); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 1000 )); + + // try destroying on couple of nodes + try { + vm4.invoke(() -> WANTestBase.destroySender( "ln" )); + } + catch (RMIException e) { + assertTrue("Cause of the exception should be GatewaySenderException", e + .getCause() instanceof GatewaySenderException); + } + try { + vm5.invoke(() -> WANTestBase.destroySender( "ln" )); + } + catch (RMIException e) { + assertTrue("Cause of the exception should be GatewaySenderException", e + .getCause() instanceof GatewaySenderException); + } + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } + + @Test + public void testDestroyParallelGatewaySender() { + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + + createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true); + + // make sure all the senders are running + waitForSendersRunning(); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 1000 )); + + Wait.pause(2000); + + //stop the sender and remove from region before calling destroy on it + stopSenders(); + + vm4.invoke(() -> WANTestBase.removeSenderFromTheRegion( + "ln", getTestMethodName() + "_PR" )); + vm5.invoke(() -> WANTestBase.removeSenderFromTheRegion( + "ln", getTestMethodName() + "_PR" )); + vm6.invoke(() -> WANTestBase.removeSenderFromTheRegion( + "ln", getTestMethodName() + "_PR" )); + vm7.invoke(() -> WANTestBase.removeSenderFromTheRegion( + "ln", getTestMethodName() + "_PR" )); + + vm4.invoke(() -> WANTestBase.destroySender( "ln" )); + vm5.invoke(() -> WANTestBase.destroySender( "ln" )); + vm6.invoke(() -> WANTestBase.destroySender( "ln" )); + vm7.invoke(() -> WANTestBase.destroySender( "ln" )); + + vm4.invoke(() -> WANTestBase.verifySenderDestroyed( "ln", true )); + vm5.invoke(() -> WANTestBase.verifySenderDestroyed( "ln", true )); + vm6.invoke(() -> WANTestBase.verifySenderDestroyed( "ln", true )); + vm7.invoke(() -> WANTestBase.verifySenderDestroyed( "ln", true )); + } + + @Test + public void testParallelGatewaySenderMessageTooLargeException() { + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + + // Create and start sender with reduced maximum message size and 1 dispatcher thread + String regionName = getTestMethodName() + "_PR"; + vm4.invoke(() -> setMaximumMessageSize( 1024*1024 )); + vm4.invoke(() -> createCache( lnPort )); + vm4.invoke(() -> setNumDispatcherThreadsForTheRun( 1 )); + vm4.invoke(() -> createSender( "ln", 2, true, 100, 100, false, false, null, false )); + vm4.invoke(() -> createPartitionedRegion( regionName, "ln", 0, 100, isOffHeap() )); + + // Do puts + int numPuts = 200; + vm4.invoke(() -> doPuts( regionName, numPuts, new byte[11000] )); + validateRegionSizes(regionName, numPuts, vm4); + + // Start receiver + IgnoredException ignoredMTLE = IgnoredException.addIgnoredException(MessageTooLargeException.class.getName(), vm4); + IgnoredException ignoredGIOE = IgnoredException.addIgnoredException(GemFireIOException.class.getName(), vm4); + vm2.invoke(() -> createCache( nyPort )); + vm2.invoke(() -> createPartitionedRegion( regionName, null, 0, 100, isOffHeap() )); + vm2.invoke(() -> createReceiver()); + validateRegionSizes( regionName, numPuts, vm2 ); + + vm4.invoke(() -> { + final AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); + assertTrue(sender.getStatistics().getBatchesResized() > 0); + }); + ignoredMTLE.remove(); + ignoredGIOE.remove(); + } + + private void setMaximumMessageSize(int maximumMessageSizeBytes) { + Message.MAX_MESSAGE_SIZE = maximumMessageSizeBytes; + LogWriterUtils.getLogWriter() + .info("Set gemfire.client.max-message-size: " + System.getProperty(DistributionConfig.GEMFIRE_PREFIX + "client.max-message-size")); + } + + private void createSendersReceiversAndPartitionedRegion(Integer lnPort, Integer nyPort, boolean createAccessors, + boolean startSenders) { + // Note: This is a test-specific method used by several test to create + // receivers, senders and partitioned regions. + createSendersAndReceivers(lnPort, nyPort); + + createPartitionedRegions(createAccessors); + + if (startSenders) { + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + } + } + + private void createSendersAndReceivers(Integer lnPort, Integer nyPort) { + // Note: This is a test-specific method used by several test to create + // receivers and senders. + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true)); + vm5.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true)); + vm6.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true)); + vm7.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true)); + } + + private void createPartitionedRegions(boolean createAccessors) { + // Note: This is a test-specific method used by several test to create + // partitioned regions. + String regionName = getTestMethodName() + "_PR"; + vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); + vm5.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); + + if (createAccessors) { + vm6.invoke(() -> createPartitionedRegionAsAccessor(regionName, "ln", 1, 100)); + vm7.invoke(() -> createPartitionedRegionAsAccessor(regionName, "ln", 1, 100)); + } else { + vm6.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); + vm7.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); + } + + vm2.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); + vm3.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); + } + + private void stopSenders() { + vm4.invoke(() -> stopSender("ln")); + vm5.invoke(() -> stopSender("ln")); + vm6.invoke(() -> stopSender("ln")); + vm7.invoke(() -> stopSender("ln")); + } + + private void waitForSendersRunning() { + vm4.invoke(() -> waitForSenderRunningState("ln")); + vm5.invoke(() -> waitForSenderRunningState("ln")); + vm6.invoke(() -> waitForSenderRunningState("ln")); + vm7.invoke(() -> waitForSenderRunningState("ln")); + } + + private void validateParallelSenderQueueAllBucketsDrained() { + vm4.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln")); + vm6.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln")); + vm7.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueOverflowDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueOverflowDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueOverflowDUnitTest.java new file mode 100644 index 0000000..a248520 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueOverflowDUnitTest.java @@ -0,0 +1,500 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.parallel; + +import static com.gemstone.gemfire.distributed.ConfigurationProperties.*; +import static org.junit.Assert.*; + +import java.io.File; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.cache.AttributesFactory; +import com.gemstone.gemfire.cache.CacheFactory; +import com.gemstone.gemfire.cache.DataPolicy; +import com.gemstone.gemfire.cache.DiskStore; +import com.gemstone.gemfire.cache.DiskStoreFactory; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.wan.GatewayEventFilter; +import com.gemstone.gemfire.cache.wan.GatewaySender; +import com.gemstone.gemfire.cache.wan.GatewaySenderFactory; +import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; +import com.gemstone.gemfire.cache30.MyGatewayEventFilter1; +import com.gemstone.gemfire.cache30.MyGatewayTransportFilter1; +import com.gemstone.gemfire.cache30.MyGatewayTransportFilter2; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.internal.cache.RegionQueue; +import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.IgnoredException; +import com.gemstone.gemfire.test.dunit.LogWriterUtils; +import com.gemstone.gemfire.test.dunit.VM; +import com.gemstone.gemfire.test.dunit.Wait; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; +import com.jayway.awaitility.Awaitility; + +/** + * DUnit for ParallelSenderQueue overflow operations. + */ +@Category(DistributedTest.class) +public class ParallelGatewaySenderQueueOverflowDUnitTest extends WANTestBase { + + @Test + public void testParallelSenderQueueEventsOverflow_NoDiskStoreSpecified() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSenderWithoutDiskStore( "ln", 2, 10, 10, false, true )); + vm5.invoke(() -> WANTestBase.createSenderWithoutDiskStore( "ln", 2, 10, 10, false, true )); + vm6.invoke(() -> WANTestBase.createSenderWithoutDiskStore( "ln", 2, 10, 10, false, true )); + vm7.invoke(() -> WANTestBase.createSenderWithoutDiskStore( "ln", 2, 10, 10, false, true )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm7.invoke(() -> WANTestBase.pauseSender( "ln" )); + + //give some time for the senders to pause + Wait.pause(1000); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), null, 1, 100, isOffHeap() )); + + int numEventPuts = 50; + vm4.invoke(() -> WANTestBase.doHeavyPuts( getTestMethodName(), numEventPuts )); + + + //considering a memory limit of 40 MB, maximum of 40 events can be in memory. Rest should be on disk. + Awaitility.await().atMost(60, TimeUnit.SECONDS).until(()-> + { + long numOvVm4 = (Long) vm4.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); + long numOvVm5 = (Long) vm5.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); + long numOvVm6 = (Long) vm6.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); + long numOvVm7 = (Long) vm7.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); + + long numMemVm4 = (Long) vm4.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); + long numMemVm5 = (Long) vm5.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); + long numMemVm6 = (Long) vm6.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); + long numMemVm7 = (Long) vm7.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); + + LogWriterUtils.getLogWriter().info("Entries overflown to disk: " + numOvVm4 + "," + numOvVm5 + "," + numOvVm6 + "," + numOvVm7); + LogWriterUtils.getLogWriter().info("Entries in VM: " + numMemVm4 + "," + numMemVm5 + "," + numMemVm6 + "," + numMemVm7); + long totalOverflown = numOvVm4 + numOvVm5 + numOvVm6 + numOvVm7; + assertTrue("Total number of entries overflown to disk should be at least greater than 55", (totalOverflown > 55)); + + long totalInMemory = numMemVm4 + numMemVm5 + numMemVm6 + numMemVm7; + //expected is twice the number of events put due to redundancy level of 1 + assertEquals("Total number of entries on disk and in VM is incorrect", (numEventPuts*2), (totalOverflown + totalInMemory)); + + }); + + vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm6.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm7.invoke(() -> WANTestBase.resumeSender( "ln" )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName(), 50, 240000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName(), 50, 240000 )); + } + + /** + * Keep same max memory limit for all the VMs + */ + @Ignore("TODO: test is disabled") + @Test + public void testParallelSenderQueueEventsOverflow() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 10, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 10, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 10, 10, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 10, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm7.invoke(() -> WANTestBase.pauseSender( "ln" )); + + //give some time for the senders to pause + Wait.pause(1000); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), null, 1, 100, isOffHeap() )); + + int numEventPuts = 50; + vm4.invoke(() -> WANTestBase.doHeavyPuts( getTestMethodName(), numEventPuts )); + + long numOvVm4 = (Long) vm4.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); + long numOvVm5 = (Long) vm5.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); + long numOvVm6 = (Long) vm6.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); + long numOvVm7 = (Long) vm7.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); + + long numMemVm4 = (Long) vm4.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); + long numMemVm5 = (Long) vm5.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); + long numMemVm6 = (Long) vm6.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); + long numMemVm7 = (Long) vm7.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); + + LogWriterUtils.getLogWriter().info("Entries overflown to disk: " + numOvVm4 + "," + numOvVm5 + "," + numOvVm6 + "," + numOvVm7); + LogWriterUtils.getLogWriter().info("Entries in VM: " + numMemVm4 + "," + numMemVm5 + "," + numMemVm6 + "," + numMemVm7); + + long totalOverflown = numOvVm4 + numOvVm5 + numOvVm6 + numOvVm7; + //considering a memory limit of 40 MB, maximum of 40 events can be in memory. Rest should be on disk. + assertTrue("Total number of entries overflown to disk should be at least greater than 55", (totalOverflown > 55)); + + long totalInMemory = numMemVm4 + numMemVm5 + numMemVm6 + numMemVm7; + //expected is twice the number of events put due to redundancy level of 1 + assertEquals("Total number of entries on disk and in VM is incorrect", (numEventPuts*2), (totalOverflown + totalInMemory)); + + vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm6.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm7.invoke(() -> WANTestBase.resumeSender( "ln" )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName(), 50 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName(), 50 )); + } + + /** + * Set a different memory limit for each VM and make sure that all the VMs are utilized to + * full extent of available memory. + */ + @Ignore("TODO: test is disabled") + @Test + public void testParallelSenderQueueEventsOverflow_2() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 10, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 5, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 5, 10, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 20, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm7.invoke(() -> WANTestBase.pauseSender( "ln" )); + + //give some time for the senders to pause + Wait.pause(1000); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), null, 1, 100, isOffHeap() )); + + int numEventPuts = 50; + vm4.invoke(() -> WANTestBase.doHeavyPuts( getTestMethodName(), numEventPuts )); + + long numOvVm4 = (Long) vm4.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); + long numOvVm5 = (Long) vm5.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); + long numOvVm6 = (Long) vm6.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); + long numOvVm7 = (Long) vm7.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); + + long numMemVm4 = (Long) vm4.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); + long numMemVm5 = (Long) vm5.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); + long numMemVm6 = (Long) vm6.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); + long numMemVm7 = (Long) vm7.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); + + LogWriterUtils.getLogWriter().info("Entries overflown to disk: " + numOvVm4 + "," + numOvVm5 + "," + numOvVm6 + "," + numOvVm7); + LogWriterUtils.getLogWriter().info("Entries in VM: " + numMemVm4 + "," + numMemVm5 + "," + numMemVm6 + "," + numMemVm7); + + long totalOverflown = numOvVm4 + numOvVm5 + numOvVm6 + numOvVm7; + //considering a memory limit of 40 MB, maximum of 40 events can be in memory. Rest should be on disk. + assertTrue("Total number of entries overflown to disk should be at least greater than 55", (totalOverflown > 55)); + + long totalInMemory = numMemVm4 + numMemVm5 + numMemVm6 + numMemVm7; + //expected is twice the number of events put due to redundancy level of 1 + assertEquals("Total number of entries on disk and in VM is incorrect", (numEventPuts*2), (totalOverflown + totalInMemory)); + + //assert the numbers for each VM + assertTrue("Number of entries in memory VM4 is incorrect. Should be less than 10", (numMemVm4 < 10)); + assertTrue("Number of entries in memory VM5 is incorrect. Should be less than 5", (numMemVm5 < 5)); + assertTrue("Number of entries in memory VM6 is incorrect. Should be less than 5", (numMemVm6 < 5)); + assertTrue("Number of entries in memory VM7 is incorrect. Should be less than 20", (numMemVm7 < 20)); + + vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm6.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm7.invoke(() -> WANTestBase.resumeSender( "ln" )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName(), 50 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName(), 50 )); + } + + @Ignore("TODO: test is disabled") + @Test + public void testParallelSenderQueueNoEventsOverflow() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 10, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 10, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 10, 10, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 10, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm7.invoke(() -> WANTestBase.pauseSender( "ln" )); + + //give some time for the senders to pause + Wait.pause(1000); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), null, 1, 100, isOffHeap() )); + + int numEventPuts = 15; + vm4.invoke(() -> WANTestBase.doHeavyPuts( getTestMethodName(), numEventPuts )); + + long numOvVm4 = (Long) vm4.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); + long numOvVm5 = (Long) vm5.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); + long numOvVm6 = (Long) vm6.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); + long numOvVm7 = (Long) vm7.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); + + long numMemVm4 = (Long) vm4.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); + long numMemVm5 = (Long) vm5.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); + long numMemVm6 = (Long) vm6.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); + long numMemVm7 = (Long) vm7.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); + + LogWriterUtils.getLogWriter().info("Entries overflown to disk: " + numOvVm4 + "," + numOvVm5 + "," + numOvVm6 + "," + numOvVm7); + LogWriterUtils.getLogWriter().info("Entries in VM: " + numMemVm4 + "," + numMemVm5 + "," + numMemVm6 + "," + numMemVm7); + + long totalOverflown = numOvVm4 + numOvVm5 + numOvVm6 + numOvVm7; + //all 30 (considering redundant copies) events should accommodate in 40 MB space given to 4 senders + assertEquals("Total number of entries overflown to disk is incorrect", 0, totalOverflown); + + long totalInMemory = numMemVm4 + numMemVm5 + numMemVm6 + numMemVm7; + //expected is twice the number of events put due to redundancy level of 1 + assertEquals("Total number of entries on disk and in VM is incorrect", (numEventPuts*2), (totalOverflown + totalInMemory)); + + vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm6.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm7.invoke(() -> WANTestBase.resumeSender( "ln" )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName(), 15 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName(), 15 )); + } + + /** + * Test to validate that ParallelGatewaySenderQueue diskSynchronous attribute + * when persistence of sender is enabled. + */ + @Ignore("TODO: test is disabled") + @Test + public void test_ValidateParallelGatewaySenderQueueAttributes_1() { + Integer localLocPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + + Integer remoteLocPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, localLocPort )); + + WANTestBase test = new WANTestBase(); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + localLocPort + "]"); + InternalDistributedSystem ds = test.getSystem(props); + cache = CacheFactory.create(ds); + + File directory = new File("TKSender" + "_disk_" + + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); + directory.mkdir(); + File[] dirs1 = new File[] { directory }; + DiskStoreFactory dsf = cache.createDiskStoreFactory(); + dsf.setDiskDirs(dirs1); + DiskStore diskStore = dsf.create("FORNY"); + + GatewaySenderFactory fact = cache.createGatewaySenderFactory(); + fact.setParallel(true);//set parallel to true + fact.setBatchConflationEnabled(true); + fact.setBatchSize(200); + fact.setBatchTimeInterval(300); + fact.setPersistenceEnabled(true);//enable the persistence + fact.setDiskSynchronous(true); + fact.setDiskStoreName("FORNY"); + fact.setMaximumQueueMemory(200); + fact.setAlertThreshold(1200); + GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1(); + fact.addGatewayEventFilter(myEventFilter1); + GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); + fact.addGatewayTransportFilter(myStreamFilter1); + GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2(); + fact.addGatewayTransportFilter(myStreamFilter2); + final IgnoredException exTKSender = IgnoredException.addIgnoredException("Could not connect"); + try { + GatewaySender sender1 = fact.create("TKSender", 2); + + AttributesFactory factory = new AttributesFactory(); + factory.addGatewaySenderId(sender1.getId()); + factory.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); + Region region = cache.createRegionFactory(factory.create()).create( + "test_ValidateGatewaySenderAttributes"); + Set senders = cache.getGatewaySenders(); + assertEquals(senders.size(), 1); + GatewaySender gatewaySender = senders.iterator().next(); + Set regionQueues = ((AbstractGatewaySender) gatewaySender) + .getQueues(); + assertEquals(regionQueues.size(), 1); + RegionQueue regionQueue = regionQueues.iterator().next(); + assertEquals(true, regionQueue.getRegion().getAttributes() + .isDiskSynchronous()); + } finally { + exTKSender.remove(); + } + } + + /** + * Test to validate that ParallelGatewaySenderQueue diskSynchronous attribute + * when persistence of sender is not enabled. + */ + @Ignore("TODO: test is disabled") + @Test + public void test_ValidateParallelGatewaySenderQueueAttributes_2() { + Integer localLocPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + + Integer remoteLocPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, localLocPort )); + + WANTestBase test = new WANTestBase(); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + localLocPort + "]"); + InternalDistributedSystem ds = test.getSystem(props); + cache = CacheFactory.create(ds); + + GatewaySenderFactory fact = cache.createGatewaySenderFactory(); + fact.setParallel(true);//set parallel to true + fact.setBatchConflationEnabled(true); + fact.setBatchSize(200); + fact.setBatchTimeInterval(300); + fact.setPersistenceEnabled(false);//set persistence to false + fact.setDiskSynchronous(true); + fact.setMaximumQueueMemory(200); + fact.setAlertThreshold(1200); + GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1(); + fact.addGatewayEventFilter(myEventFilter1); + GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); + fact.addGatewayTransportFilter(myStreamFilter1); + GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2(); + fact.addGatewayTransportFilter(myStreamFilter2); + final IgnoredException ex = IgnoredException.addIgnoredException("Could not connect"); + try { + GatewaySender sender1 = fact.create("TKSender", 2); + AttributesFactory factory = new AttributesFactory(); + factory.addGatewaySenderId(sender1.getId()); + factory.setDataPolicy(DataPolicy.PARTITION); + Region region = cache.createRegionFactory(factory.create()).create( + "test_ValidateGatewaySenderAttributes"); + Set senders = cache.getGatewaySenders(); + assertEquals(senders.size(), 1); + GatewaySender gatewaySender = senders.iterator().next(); + Set regionQueues = ((AbstractGatewaySender) gatewaySender) + .getQueues(); + assertEquals(regionQueues.size(), 1); + RegionQueue regionQueue = regionQueues.iterator().next(); + assertEquals(false, regionQueue.getRegion().getAttributes() + .isDiskSynchronous()); + } finally { + ex.remove(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java new file mode 100644 index 0000000..87a74b9 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java @@ -0,0 +1,497 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.parallel; + +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.IgnoredException; + +/** + * + */ +@Category(DistributedTest.class) +public class ParallelWANConflationDUnitTest extends WANTestBase { + private static final long serialVersionUID = 1L; + + public ParallelWANConflationDUnitTest() { + super(); + } + + @Override + protected final void postSetUpWANTestBase() throws Exception { + IgnoredException.addIgnoredException("java.net.ConnectException"); + } + + @Test + public void testParallelPropagationConflationDisabled() throws Exception { + initialSetUp(); + + createSendersNoConflation(); + + createSenderPRs(); + + startPausedSenders(); + + createReceiverPrs(); + + final Map keyValues = putKeyValues(); + + vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() )); + + final Map updateKeyValues = updateKeyValues(); + + vm4.invoke(() ->checkQueueSize( "ln", (keyValues.size() + updateKeyValues.size()) )); + + vm2.invoke(() ->validateRegionSize( + getTestMethodName(), 0 )); + + resumeSenders(); + + keyValues.putAll(updateKeyValues); + validateReceiverRegionSize(keyValues); + + } + + /** + * This test is disabled as it is not guaranteed to pass it everytime. This + * test is related to the conflation in batch. yet did find any way to + * ascertain that the vents in the batch will always be conflated. + * + * @throws Exception + */ + @Test + public void testParallelPropagationBatchConflation() throws Exception { + initialSetUp(); + + vm4.invoke(() ->createSender( "ln", 2, + true, 100, 50, false, false, null, true )); + vm5.invoke(() ->createSender( "ln", 2, + true, 100, 50, false, false, null, true )); + vm6.invoke(() ->createSender( "ln", 2, + true, 100, 50, false, false, null, true )); + vm7.invoke(() ->createSender( "ln", 2, + true, 100, 50, false, false, null, true )); + + createSenderPRs(); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + pauseSenders(); + + createReceiverPrs(); + + final Map keyValues = new HashMap(); + + for (int i = 1; i <= 10; i++) { + for (int j = 1; j <= 10; j++) { + keyValues.put(j, i) ; + } + vm4.invoke(() ->putGivenKeyValue( + getTestMethodName(), keyValues )); + } + + vm4.invoke(() ->enableConflation( "ln" )); + vm5.invoke(() ->enableConflation( "ln" )); + vm6.invoke(() ->enableConflation( "ln" )); + vm7.invoke(() ->enableConflation( "ln" )); + + resumeSenders(); + + ArrayList v4List = (ArrayList)vm4.invoke(() -> + WANTestBase.getSenderStats( "ln", 0 )); + ArrayList v5List = (ArrayList)vm5.invoke(() -> + WANTestBase.getSenderStats( "ln", 0 )); + ArrayList v6List = (ArrayList)vm6.invoke(() -> + WANTestBase.getSenderStats( "ln", 0 )); + ArrayList v7List = (ArrayList)vm7.invoke(() -> + WANTestBase.getSenderStats( "ln", 0 )); + + assertTrue("No events conflated in batch", (v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0); + + vm2.invoke(() ->validateRegionSize( + getTestMethodName(), 10 )); + + } + + @Test + public void testParallelPropagationConflation() throws Exception { + doTestParallelPropagationConflation(0); + } + + @Test + public void testParallelPropagationConflationRedundancy2() throws Exception { + doTestParallelPropagationConflation(2); + } + + public void doTestParallelPropagationConflation(int redundancy) throws Exception { + initialSetUp(); + + createSendersWithConflation(); + + createSenderPRs(redundancy); + + startPausedSenders(); + + createReceiverPrs(); + + final Map keyValues = putKeyValues(); + + vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() )); + final Map updateKeyValues = updateKeyValues(); + + vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() )); // creates aren't conflated + + vm4.invoke(() ->putGivenKeyValue( getTestMethodName(), updateKeyValues )); + + vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() )); // creates aren't conflated + + vm2.invoke(() ->validateRegionSize( + getTestMethodName(), 0 )); + + resumeSenders(); + + keyValues.putAll(updateKeyValues); + validateReceiverRegionSize(keyValues); + } + + @Test + public void testParallelPropagationConflationOfRandomKeys() throws Exception { + initialSetUp(); + + createSendersWithConflation(); + + createSenderPRs(); + + startPausedSenders(); + + createReceiverPrs(); + + final Map keyValues = putKeyValues(); + + vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() )); + + final Map updateKeyValues = new HashMap(); + while(updateKeyValues.size()!=10) { + int key = (new Random()).nextInt(keyValues.size()); + updateKeyValues.put(key, key+"_updated"); + } + vm4.invoke(() ->putGivenKeyValue( getTestMethodName(), updateKeyValues )); + + vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() )); + + vm4.invoke(() ->putGivenKeyValue( getTestMethodName(), updateKeyValues )); + + vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() )); + + vm2.invoke(() ->validateRegionSize( + getTestMethodName(), 0 )); + + resumeSenders(); + + + keyValues.putAll(updateKeyValues); + validateReceiverRegionSize(keyValues); + + } + + @Test + public void testParallelPropagationColocatedRegionConflation() + throws Exception { + initialSetUp(); + + createSendersWithConflation(); + + createOrderShipmentOnSenders(); + + startPausedSenders(); + + createOrderShipmentOnReceivers(); + + Map custKeyValues = (Map)vm4.invoke(() ->putCustomerPartitionedRegion( 20 )); + Map orderKeyValues = (Map)vm4.invoke(() ->putOrderPartitionedRegion( 20 )); + Map shipmentKeyValues = (Map)vm4.invoke(() ->putShipmentPartitionedRegion( 20 )); + + vm4.invoke(() -> + WANTestBase.checkQueueSize( + "ln", + (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues + .size()) )); + + Map updatedCustKeyValues = (Map)vm4.invoke(() ->updateCustomerPartitionedRegion( 10 )); + Map updatedOrderKeyValues = (Map)vm4.invoke(() ->updateOrderPartitionedRegion( 10 )); + Map updatedShipmentKeyValues = (Map)vm4.invoke(() ->updateShipmentPartitionedRegion( 10 )); + int sum = (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues + .size()) + + updatedCustKeyValues.size() + + updatedOrderKeyValues.size() + + updatedShipmentKeyValues.size(); + vm4.invoke(() -> + WANTestBase.checkQueueSize( + "ln", + sum)); + + + updatedCustKeyValues = (Map)vm4.invoke(() ->updateCustomerPartitionedRegion( 10 )); + updatedOrderKeyValues = (Map)vm4.invoke(() ->updateOrderPartitionedRegion( 10 )); + updatedShipmentKeyValues = (Map)vm4.invoke(() ->updateShipmentPartitionedRegion( 10 )); + int sum2 = (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues + .size()) + + updatedCustKeyValues.size() + + updatedOrderKeyValues.size() + + updatedShipmentKeyValues.size(); + vm4.invoke(() -> + WANTestBase.checkQueueSize( + "ln", + sum2)); + + vm2.invoke(() ->validateRegionSize( + WANTestBase.customerRegionName, 0 )); + vm2.invoke(() ->validateRegionSize( + WANTestBase.orderRegionName, 0 )); + vm2.invoke(() ->validateRegionSize( + WANTestBase.shipmentRegionName, 0 )); + + resumeSenders(); + + custKeyValues.putAll(updatedCustKeyValues); + orderKeyValues.putAll(updatedOrderKeyValues); + shipmentKeyValues.putAll(updatedShipmentKeyValues); + + validateColocatedRegionContents(custKeyValues, orderKeyValues, + shipmentKeyValues); + + } + + // + //This is the same as the previous test, except for the UsingCustId methods + @Test + public void testParallelPropagationColocatedRegionConflationSameKey() + throws Exception { + initialSetUp(); + + createSendersWithConflation(); + + createOrderShipmentOnSenders(); + + startPausedSenders(); + + createOrderShipmentOnReceivers(); + + Map custKeyValues = (Map)vm4.invoke(() ->putCustomerPartitionedRegion( 20 )); + Map orderKeyValues = (Map)vm4.invoke(() ->putOrderPartitionedRegionUsingCustId( 20 )); + Map shipmentKeyValues = (Map)vm4.invoke(() ->putShipmentPartitionedRegionUsingCustId( 20 )); + + vm4.invoke(() ->checkQueueSize( "ln", (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues + .size()) )); + + Map updatedCustKeyValues = (Map)vm4.invoke(() ->updateCustomerPartitionedRegion( 10 )); + Map updatedOrderKeyValues = (Map)vm4.invoke(() ->updateOrderPartitionedRegionUsingCustId( 10 )); + Map updatedShipmentKeyValues = (Map)vm4.invoke(() ->updateShipmentPartitionedRegionUsingCustId( 10 )); + int sum = (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues + .size()) + updatedCustKeyValues.size() + updatedOrderKeyValues.size() + updatedShipmentKeyValues.size() ; + + vm4.invoke(() ->checkQueueSize( "ln", sum)); + + updatedCustKeyValues = (Map)vm4.invoke(() ->updateCustomerPartitionedRegion( 10 )); + updatedOrderKeyValues = (Map)vm4.invoke(() ->updateOrderPartitionedRegionUsingCustId( 10 )); + updatedShipmentKeyValues = (Map)vm4.invoke(() ->updateShipmentPartitionedRegionUsingCustId( 10 )); + + int sum2 = (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues + .size()) + updatedCustKeyValues.size() + updatedOrderKeyValues.size() + updatedShipmentKeyValues.size(); + vm4.invoke(() ->checkQueueSize( "ln", sum2)); + + vm2.invoke(() ->validateRegionSize( + WANTestBase.customerRegionName, 0 )); + vm2.invoke(() ->validateRegionSize( + WANTestBase.orderRegionName, 0 )); + vm2.invoke(() ->validateRegionSize( + WANTestBase.shipmentRegionName, 0 )); + + resumeSenders(); + + custKeyValues.putAll(updatedCustKeyValues); + orderKeyValues.putAll(updatedOrderKeyValues); + shipmentKeyValues.putAll(updatedShipmentKeyValues); + + validateColocatedRegionContents(custKeyValues, orderKeyValues, + shipmentKeyValues); + } + + protected void validateColocatedRegionContents(Map custKeyValues, + Map orderKeyValues, Map shipmentKeyValues) { + vm2.invoke(() ->validateRegionSize( + WANTestBase.customerRegionName, custKeyValues.size() )); + vm2.invoke(() ->validateRegionSize( + WANTestBase.orderRegionName, orderKeyValues.size() )); + vm2.invoke(() ->validateRegionSize( + WANTestBase.shipmentRegionName, shipmentKeyValues.size() )); + + vm2.invoke(() ->validateRegionContents( + WANTestBase.customerRegionName, custKeyValues )); + vm2.invoke(() ->validateRegionContents( + WANTestBase.orderRegionName, orderKeyValues )); + vm2.invoke(() ->validateRegionContents( + WANTestBase.shipmentRegionName, shipmentKeyValues )); + + vm3.invoke(() ->validateRegionSize( + WANTestBase.customerRegionName, custKeyValues.size() )); + vm3.invoke(() ->validateRegionSize( + WANTestBase.orderRegionName, orderKeyValues.size() )); + vm3.invoke(() ->validateRegionSize( + WANTestBase.shipmentRegionName, shipmentKeyValues.size() )); + + vm3.invoke(() ->validateRegionContents( + WANTestBase.customerRegionName, custKeyValues )); + vm3.invoke(() ->validateRegionContents( + WANTestBase.orderRegionName, orderKeyValues )); + vm3.invoke(() ->validateRegionContents( + WANTestBase.shipmentRegionName, shipmentKeyValues )); + } + + protected void createOrderShipmentOnReceivers() { + vm2.invoke(() ->createCustomerOrderShipmentPartitionedRegion(null, 1, 8, isOffHeap() )); + vm3.invoke(() ->createCustomerOrderShipmentPartitionedRegion(null, 1, 8, isOffHeap() )); + } + + protected void createOrderShipmentOnSenders() { + vm4.invoke(() ->createCustomerOrderShipmentPartitionedRegion("ln", 0, 8, isOffHeap() )); + vm5.invoke(() ->createCustomerOrderShipmentPartitionedRegion("ln", 0, 8, isOffHeap() )); + vm6.invoke(() ->createCustomerOrderShipmentPartitionedRegion("ln", 0, 8, isOffHeap() )); + vm7.invoke(() ->createCustomerOrderShipmentPartitionedRegion("ln", 0, 8, isOffHeap() )); + } + + protected Map updateKeyValues() { + final Map updateKeyValues = new HashMap(); + for(int i=0;i<10;i++) { + updateKeyValues.put(i, i+"_updated"); + } + + vm4.invoke(() ->putGivenKeyValue( getTestMethodName(), updateKeyValues )); + return updateKeyValues; + } + + protected Map putKeyValues() { + final Map keyValues = new HashMap(); + for(int i=0; i< 20; i++) { + keyValues.put(i, i); + } + + + vm4.invoke(() ->putGivenKeyValue( getTestMethodName(), keyValues )); + return keyValues; + } + + protected void validateReceiverRegionSize(final Map keyValues) { + vm2.invoke(() ->validateRegionSize( + getTestMethodName(), keyValues.size() )); + vm3.invoke(() ->validateRegionSize( + getTestMethodName(), keyValues.size() )); + + vm2.invoke(() ->validateRegionContents( + getTestMethodName(), keyValues )); + vm3.invoke(() ->validateRegionContents( + getTestMethodName(), keyValues )); + } + + protected void resumeSenders() { + vm4.invoke(() ->resumeSender( "ln" )); + vm5.invoke(() ->resumeSender( "ln" )); + vm6.invoke(() ->resumeSender( "ln" )); + vm7.invoke(() ->resumeSender( "ln" )); + } + + protected void createReceiverPrs() { + vm2.invoke(() ->createPartitionedRegion( + getTestMethodName(), null, 1, 8, isOffHeap() )); + vm3.invoke(() ->createPartitionedRegion( + getTestMethodName(), null, 1, 8, isOffHeap() )); + } + + protected void startPausedSenders() { + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + pauseSenders(); + } + + protected void pauseSenders() { + vm4.invoke(() ->pauseSender( "ln" )); + vm5.invoke(() ->pauseSender( "ln" )); + vm6.invoke(() ->pauseSender( "ln" )); + vm7.invoke(() ->pauseSender( "ln" )); + } + + protected void createSenderPRs() { + createSenderPRs(0); + } + + protected void createSenderPRs(int redundancy) { + vm4.invoke(() ->createPartitionedRegion( + getTestMethodName(), "ln", redundancy, 8, isOffHeap() )); + vm5.invoke(() ->createPartitionedRegion( + getTestMethodName(), "ln", redundancy, 8, isOffHeap() )); + vm6.invoke(() ->createPartitionedRegion( + getTestMethodName(), "ln", redundancy, 8, isOffHeap() )); + vm7.invoke(() ->createPartitionedRegion( + getTestMethodName(), "ln", redundancy, 8, isOffHeap() )); + } + + protected void initialSetUp() { + Integer lnPort = (Integer)vm0.invoke(() ->createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() ->createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + } + + protected void createSendersNoConflation() { + vm4.invoke(() ->createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm5.invoke(() ->createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm6.invoke(() ->createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm7.invoke(() ->createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + } + + protected void createSendersWithConflation() { + vm4.invoke(() ->createSender( "ln", 2, + true, 100, 2, true, false, null, true )); + vm5.invoke(() ->createSender( "ln", 2, + true, 100, 2, true, false, null, true )); + vm6.invoke(() ->createSender( "ln", 2, + true, 100, 2, true, false, null, true )); + vm7.invoke(() ->createSender( "ln", 2, + true, 100, 2, true, false, null, true )); + } + +}