Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E139618891 for ; Fri, 11 Dec 2015 21:22:52 +0000 (UTC) Received: (qmail 69253 invoked by uid 500); 11 Dec 2015 21:22:52 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 69145 invoked by uid 500); 11 Dec 2015 21:22:52 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 69129 invoked by uid 99); 11 Dec 2015 21:22:52 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Dec 2015 21:22:52 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 3669E180A93 for ; Fri, 11 Dec 2015 21:22:52 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.227 X-Spam-Level: * X-Spam-Status: No, score=1.227 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.554, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id OWt2-kaW9ceO for ; Fri, 11 Dec 2015 21:22:38 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id C191825085 for ; Fri, 11 Dec 2015 21:22:36 +0000 (UTC) Received: (qmail 66457 invoked by uid 99); 11 Dec 2015 21:22:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Dec 2015 21:22:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7164DE0914; Fri, 11 Dec 2015 21:22:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: klund@apache.org To: commits@geode.incubator.apache.org Date: Fri, 11 Dec 2015 21:23:16 -0000 Message-Id: <27cc1b211d39468dbd68d6d7e55aa588@git.apache.org> In-Reply-To: <696653637cf04b7db23d9c6088fc9419@git.apache.org> References: <696653637cf04b7db23d9c6088fc9419@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [42/50] [abbrv] incubator-geode git commit: GEODE-637: Additional tests for AsyncEventQueues http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/476c6cd3/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java new file mode 100644 index 0000000..1eafbb0 --- /dev/null +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java @@ -0,0 +1,1911 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.internal.cache.wan.asyncqueue; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.junit.Ignore; + +import com.gemstone.gemfire.cache.CacheFactory; +import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory; +import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl; +import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl; +import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; +import com.gemstone.gemfire.distributed.internal.DistributionConfig; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.internal.AvailablePortHelper; +import com.gemstone.gemfire.internal.cache.wan.AsyncEventQueueTestBase; + +public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { + + private static final long serialVersionUID = 1L; + + public AsyncEventListenerDUnitTest(String name) { + super(name); + } + + public void setUp() throws Exception { + super.setUp(); + } + + /** + * Test to verify that AsyncEventQueue can not be created when null listener + * is passed. + */ + public void testCreateAsyncEventQueueWithNullListener() { + AsyncEventQueueTestBase test = new AsyncEventQueueTestBase(testName); + Properties props = new Properties(); + props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); + InternalDistributedSystem ds = test.getSystem(props); + cache = CacheFactory.create(ds); + + AsyncEventQueueFactory asyncQueueFactory = cache + .createAsyncEventQueueFactory(); + try { + asyncQueueFactory.create("testId", null); + fail("AsyncQueueFactory should not allow to create AsyncEventQueue with null listener"); + } + catch (IllegalArgumentException e) { + // expected + } + + } + + public void testSerialAsyncEventQueueAttributes() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 150, true, true, "testDS", true }); + + vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventQueueAttributes", + new Object[] { "ln", 100, 150, AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL, true, "testDS", true, true }); + } + + public void testSerialAsyncEventQueueSize() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false }); + vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false }); + vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false }); + vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + + vm4 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + vm5 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + vm6 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + vm7 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + pause(1000);// pause at least for the batchTimeInterval + + vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR", + 1000 }); + + int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class, + "getAsyncEventQueueSize", new Object[] { "ln" }); + int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class, + "getAsyncEventQueueSize", new Object[] { "ln" }); + assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm4size); + assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm5size); + } + + /** + * Added to reproduce defect #50366: + * NullPointerException with AsyncEventQueue#size() when number of dispatchers is more than 1 + */ + public void testConcurrentSerialAsyncEventQueueSize() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + false, 100, 150, true, false, null, false, 2, OrderPolicy.KEY }); + vm5.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + false, 100, 150, true, false, null, false, 2, OrderPolicy.KEY }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + + vm4 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + vm5 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + + pause(1000);// pause at least for the batchTimeInterval + + vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR", + 1000 }); + + int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class, + "getAsyncEventQueueSize", new Object[] { "ln" }); + int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class, + "getAsyncEventQueueSize", new Object[] { "ln" }); + assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm4size); + assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm5size); + } + + /** + * Test configuration:: + * + * Region: Replicated WAN: Serial Region persistence enabled: false Async + * channel persistence enabled: false + */ + + public void testReplicatedSerialAsyncEventQueue() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false }); + vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false }); + vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false }); + vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + + vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR", + 1000 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 1000 });// primary sender + vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 });// secondary + vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 });// secondary + vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 });// secondary + } + + /** + * Verify that the events loaded by CacheLoader reach the AsyncEventListener + * with correct operation detail (added for defect #50237). + */ + public void testReplicatedSerialAsyncEventQueueWithCacheLoader() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false, "MyAsyncEventListener_CacheLoader" }); + vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false, "MyAsyncEventListener_CacheLoader" }); + vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false, "MyAsyncEventListener_CacheLoader" }); + vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false, "MyAsyncEventListener_CacheLoader" }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithCacheLoaderAndAsyncEventQueue", + new Object[] { testName + "_RR", "ln" }); + vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithCacheLoaderAndAsyncEventQueue", + new Object[] { testName + "_RR", "ln" }); + vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithCacheLoaderAndAsyncEventQueue", + new Object[] { testName + "_RR", "ln" }); + vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithCacheLoaderAndAsyncEventQueue", + new Object[] { testName + "_RR", "ln" }); + + vm4.invoke(AsyncEventQueueTestBase.class, "doGets", new Object[] { testName + "_RR", + 10 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventForOperationDetail", + new Object[] { "ln", 10, true, false });// primary sender + vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventForOperationDetail", + new Object[] { "ln", 0, true, false });// secondary + vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventForOperationDetail", + new Object[] { "ln", 0, true, false });// secondary + vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventForOperationDetail", + new Object[] { "ln", 0, true, false });// secondary + } + + /** + * Test configuration:: + * + * Region: Replicated + * WAN: Serial + * Region persistence enabled: false + * Async queue persistence enabled: false + * + * Error is thrown from AsyncEventListener implementation while processing the batch. + * Added to test the fix done for defect #45152. + */ + + public void testReplicatedSerialAsyncEventQueue_ExceptionScenario() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithCustomListener", new Object[] { "ln", + false, 100, 100, false, false, null, false, 1 }); + vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithCustomListener", new Object[] { "ln", + false, 100, 100, false, false, null, false, 1 }); + vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithCustomListener", new Object[] { "ln", + false, 100, 100, false, false, null, false, 1 }); + vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithCustomListener", new Object[] { "ln", + false, 100, 100, false, false, null, false, 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + + vm4 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + vm5 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + vm6 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + vm7 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + pause(2000);// pause at least for the batchTimeInterval + + vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR", + 100 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue", + new Object[] { "ln" }); + vm5.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue", + new Object[] { "ln" }); + vm6.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue", + new Object[] { "ln" }); + vm7.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue", + new Object[] { "ln" }); + + vm4.invoke(AsyncEventQueueTestBase.class, "validateCustomAsyncEventListener", + new Object[] { "ln", 100 });// primary sender + vm5.invoke(AsyncEventQueueTestBase.class, "validateCustomAsyncEventListener", + new Object[] { "ln", 0 });// secondary + vm6.invoke(AsyncEventQueueTestBase.class, "validateCustomAsyncEventListener", + new Object[] { "ln", 0 });// secondary + vm7.invoke(AsyncEventQueueTestBase.class, "validateCustomAsyncEventListener", + new Object[] { "ln", 0 });// secondary + } + + /** + * Test configuration:: + * + * Region: Replicated WAN: Serial Region persistence enabled: false Async + * channel persistence enabled: false AsyncEventQueue conflation enabled: true + */ + public void testReplicatedSerialAsyncEventQueueWithConflationEnabled() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, true, false, null, false }); + vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, true, false, null, false }); + vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, true, false, null, false }); + vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, true, false, null, false }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + + vm4 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + vm5 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + vm6 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + vm7 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + pause(1000);// pause at least for the batchTimeInterval + + final Map keyValues = new HashMap(); + final Map updateKeyValues = new HashMap(); + for (int i = 0; i < 1000; i++) { + keyValues.put(i, i); + } + + vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] { + testName + "_RR", keyValues }); + + pause(1000); + vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] { + "ln", keyValues.size() }); + + for (int i = 0; i < 500; i++) { + updateKeyValues.put(i, i + "_updated"); + } + + // Put the update events and check the queue size. + // There should be no conflation with the previous create events. + vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] { + testName + "_RR", updateKeyValues }); + + vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] { + "ln", keyValues.size() + updateKeyValues.size() }); + + // Put the update events again and check the queue size. + // There should be conflation with the previous update events. + vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] { + testName + "_RR", updateKeyValues }); + + vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] { + "ln", keyValues.size() + updateKeyValues.size() }); + + vm4.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue", + new Object[] { "ln" }); + vm5.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue", + new Object[] { "ln" }); + vm6.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue", + new Object[] { "ln" }); + vm7.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue", + new Object[] { "ln" }); + + vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 1000 });// primary sender + vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 });// secondary + vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 });// secondary + vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 });// secondary + } + + + + /** + * Test configuration:: + * + * Region: Replicated WAN: Serial Region persistence enabled: false Async + * event queue persistence enabled: false + * + * Note: The test doesn't create a locator but uses MCAST port instead. + */ + @Ignore("Disabled until I can sort out the hydra dependencies - see bug 52214") + public void DISABLED_testReplicatedSerialAsyncEventQueueWithoutLocator() { + int mPort = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + vm4.invoke(AsyncEventQueueTestBase.class, "createCacheWithoutLocator", + new Object[] { mPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCacheWithoutLocator", + new Object[] { mPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCacheWithoutLocator", + new Object[] { mPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCacheWithoutLocator", + new Object[] { mPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false }); + vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false }); + vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false }); + vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + + vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR", + 1000 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 1000 });// primary sender + vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 });// secondary + vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 });// secondary + vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 });// secondary + } + + /** + * Test configuration:: + * + * Region: Replicated WAN: Serial Region persistence enabled: false Async + * channel persistence enabled: true + * + * No VM is restarted. + */ + + public void testReplicatedSerialAsyncEventQueueWithPeristenceEnabled() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, true, false, null, false }); + vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, true, false, null, false }); + vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, true, false, null, false }); + vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, true, false, null, false }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + + vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR", + 1000 }); + vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 1000 });// primary sender + vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 });// secondary + vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 });// secondary + vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 });// secondary + } + + /** + * Test configuration:: + * + * Region: Replicated WAN: Serial Region persistence enabled: false Async + * channel persistence enabled: true + * + * There is only one vm in the site and that vm is restarted + */ + + @Ignore("Disabled for 52351") + public void DISABLED_testReplicatedSerialAsyncEventQueueWithPeristenceEnabled_Restart() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + String firstDStore = (String)vm4.invoke(AsyncEventQueueTestBase.class, + "createAsyncEventQueueWithDiskStore", new Object[] { "ln", false, 100, + 100, true, null }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + + // pause async channel and then do the puts + vm4 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR", + 1000 }); + + // ------------------ KILL VM4 AND REBUILD + // ------------------------------------------ + vm4.invoke(AsyncEventQueueTestBase.class, "killSender", new Object[] {}); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithDiskStore", + new Object[] { "ln", false, 100, 100, true, firstDStore }); + vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + // ----------------------------------------------------------------------------------- + + vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 1000 });// primary sender + } + + /** + * Test configuration:: + * + * Region: Replicated WAN: Serial Region persistence enabled: false Async + * channel persistence enabled: true + * + * There are 3 VMs in the site and the VM with primary sender is shut down. + */ + @Ignore("Disabled for 52351") + public void DISABLED_testReplicatedSerialAsyncEventQueueWithPeristenceEnabled_Restart2() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithDiskStore", + new Object[] { "ln", false, 100, 100, true, null }); + vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithDiskStore", + new Object[] { "ln", false, 100, 100, true, null }); + vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithDiskStore", + new Object[] { "ln", false, 100, 100, true, null }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm4.invoke(AsyncEventQueueTestBase.class, "addCacheListenerAndCloseCache", + new Object[] { testName + "_RR" }); + vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + + vm5.invoke(AsyncEventQueueTestBase.class, "doPuts", + new Object[] { testName + "_RR", 2000 }); + + // ----------------------------------------------------------------------------------- + vm5.invoke(AsyncEventQueueTestBase.class, "waitForSenderToBecomePrimary", + new Object[] { AsyncEventQueueImpl + .getSenderIdFromAsyncEventQueueId("ln") }); + + vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + + int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class, + "getAsyncEventListenerMapSize", new Object[] { "ln" }); + int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class, + "getAsyncEventListenerMapSize", new Object[] { "ln" }); + + getLogWriter().info("vm4 size is: " + vm4size); + getLogWriter().info("vm5 size is: " + vm5size); + // verify that there is no event loss + assertTrue( + "Total number of entries in events map on vm4 and vm5 should be at least 2000", + (vm4size + vm5size) >= 2000); + } + + /** + * Test configuration:: + * + * Region: Replicated + * WAN: Serial + * Dispatcher threads: more than 1 + * Order policy: key based ordering + */ + public void testConcurrentSerialAsyncEventQueueWithReplicatedRegion() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + false, 100, 100, true, false, null, false, 3, OrderPolicy.KEY }); + vm5.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + false, 100, 100, true, false, null, false, 3, OrderPolicy.KEY }); + vm6.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + false, 100, 100, true, false, null, false, 3, OrderPolicy.KEY }); + vm7.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + false, 100, 100, true, false, null, false, 3, OrderPolicy.KEY }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + + vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR", + 1000 }); + vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] {"ln", 1000 });// primary sender + vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] {"ln", 0 });// secondary + vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] {"ln", 0 });// secondary + vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] {"ln", 0 });// secondary + } + + /** + * Test configuration:: + * + * Region: Replicated + * WAN: Serial + * Region persistence enabled: false + * Async queue persistence enabled: false + */ + public void testConcurrentSerialAsyncEventQueueWithReplicatedRegion_2() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + false, 100, 100, true, false, null, false, 3, OrderPolicy.THREAD }); + vm5.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + false, 100, 100, true, false, null, false, 3, OrderPolicy.THREAD }); + vm6.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + false, 100, 100, true, false, null, false, 3, OrderPolicy.THREAD }); + vm7.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + false, 100, 100, true, false, null, false, 3, OrderPolicy.THREAD }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + + vm4.invokeAsync(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR", + 500 }); + vm4.invokeAsync(AsyncEventQueueTestBase.class, "doNextPuts", new Object[] { testName + "_RR", + 500, 1000 }); + vm4.invokeAsync(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR", + 1000, 1500 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] {"ln", 1000 });// primary sender + vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] {"ln", 0 });// secondary + vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] {"ln", 0 });// secondary + vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] {"ln", 0 });// secondary + } + + /** + * Dispatcher threads set to more than 1 but no order policy set. + * Added for defect #50514. + */ + public void testConcurrentSerialAsyncEventQueueWithoutOrderPolicy() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + false, 100, 100, true, false, null, false, 3, null }); + vm5.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + false, 100, 100, true, false, null, false, 3, null }); + vm6.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + false, 100, 100, true, false, null, false, 3, null }); + vm7.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + false, 100, 100, true, false, null, false, 3, null }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + + vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR", + 1000 }); + vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] {"ln", 1000 });// primary sender + vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] {"ln", 0 });// secondary + vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] {"ln", 0 });// secondary + vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] {"ln", 0 });// secondary + } + + /** + * Test configuration:: + * + * Region: Partitioned WAN: Serial Region persistence enabled: false Async + * channel persistence enabled: false + */ + public void testPartitionedSerialAsyncEventQueue() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false }); + vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false }); + vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false }); + vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + + vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR", + 500 }); + vm5.invoke(AsyncEventQueueTestBase.class, "doPutsFrom", new Object[] { + testName + "_PR", 500, 1000 }); + vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 1000 });// primary sender + vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 });// secondary + vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 });// secondary + vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 });// secondary + } + + /** + * Test configuration:: + * + * Region: Partitioned WAN: Serial Region persistence enabled: false Async + * channel persistence enabled: false AsyncEventQueue conflation enabled: true + */ + public void testPartitionedSerialAsyncEventQueueWithConflationEnabled() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, true, false, null, false }); + vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, true, false, null, false }); + vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, true, false, null, false }); + vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, true, false, null, false }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + + vm4 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + vm5 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + vm6 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + vm7 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + + pause(2000); + + final Map keyValues = new HashMap(); + final Map updateKeyValues = new HashMap(); + for (int i = 0; i < 1000; i++) { + keyValues.put(i, i); + } + + vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] { + testName + "_PR", keyValues }); + + vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] { + "ln", keyValues.size() }); + + for (int i = 0; i < 500; i++) { + updateKeyValues.put(i, i + "_updated"); + } + + // Put the update events and check the queue size. + // There should be no conflation with the previous create events. + vm5.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] { + testName + "_PR", updateKeyValues }); + + vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] { + "ln", keyValues.size() + updateKeyValues.size() }); + + // Put the update events again and check the queue size. + // There should be conflation with the previous update events. + vm5.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] { + testName + "_PR", updateKeyValues }); + + vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] { + "ln", keyValues.size() + updateKeyValues.size() }); + + vm4.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue", + new Object[] { "ln" }); + vm5.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue", + new Object[] { "ln" }); + vm6.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue", + new Object[] { "ln" }); + vm7.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue", + new Object[] { "ln" }); + + vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 1000 });// primary sender + vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 });// secondary + vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 });// secondary + vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 });// secondary + } + + /** + * Test configuration:: + * + * Region: Partitioned WAN: Serial Region persistence enabled: false Async + * channel persistence enabled: true + * + * No VM is restarted. + */ + public void testPartitionedSerialAsyncEventQueueWithPeristenceEnabled() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, true, null, false }); + vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, true, null, false }); + vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, true, null, false }); + vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, true, null, false }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + + vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR", + 500 }); + vm5.invoke(AsyncEventQueueTestBase.class, "doPutsFrom", new Object[] { + testName + "_PR", 500, 1000 }); + vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 1000 });// primary sender + vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 });// secondary + vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 });// secondary + vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 });// secondary + } + + /** + * Test configuration:: + * + * Region: Partitioned WAN: Serial Region persistence enabled: false Async + * channel persistence enabled: true + * + * There is only one vm in the site and that vm is restarted + */ + public void testPartitionedSerialAsyncEventQueueWithPeristenceEnabled_Restart() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + String firstDStore = (String)vm4.invoke(AsyncEventQueueTestBase.class, + "createAsyncEventQueueWithDiskStore", new Object[] { "ln", false, 100, + 100, true, null }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + + // pause async channel and then do the puts + vm4 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueueAndWaitForDispatcherToPause", + new Object[] { "ln" }); + + vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR", + 1000 }); + + // ------------------ KILL VM4 AND REBUILD + // ------------------------------------------ + vm4.invoke(AsyncEventQueueTestBase.class, "killSender", new Object[] {}); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithDiskStore", + new Object[] { "ln", false, 100, 100, true, firstDStore }); + vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + // ----------------------------------------------------------------------------------- + + vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 1000 });// primary sender + } + + public void testParallelAsyncEventQueueWithReplicatedRegion() { + try { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { + "ln", true, 100, 100, true, false, null, false }); + vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { + "ln", true, 100, 100, true, false, null, false }); + vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { + "ln", true, 100, 100, true, false, null, false }); + vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { + "ln", true, 100, 100, true, false, null, false }); + + vm4.invoke(AsyncEventQueueTestBase.class, + "createReplicatedRegionWithAsyncEventQueue", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + fail("Expected GatewaySenderConfigException where parallel async event queue can not be used with replicated region"); + } + catch (Exception e) { + if (!e.getCause().getMessage() + .contains("can not be used with replicated region")) { + fail("Expected GatewaySenderConfigException where parallel async event queue can not be used with replicated region"); + } + } + } + + public void testParallelAsyncEventQueue() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, false, null, false }); + vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, false, null, false }); + vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, false, null, false }); + vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, false, null, false }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + + vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR", + 256 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + + int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize", + new Object[] { "ln"}); + int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize", + new Object[] { "ln"}); + int vm6size = (Integer)vm6.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize", + new Object[] { "ln"}); + int vm7size = (Integer)vm7.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize", + new Object[] { "ln"}); + + assertEquals(vm4size + vm5size + vm6size + vm7size, 256); + } + + /** + * Verify that the events reaching the AsyncEventListener have correct operation detail. + * (added for defect #50237). + */ + public void testParallelAsyncEventQueueWithCacheLoader() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, false, null, false, "MyAsyncEventListener_CacheLoader" }); + vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, false, null, false, "MyAsyncEventListener_CacheLoader" }); + vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, false, null, false, "MyAsyncEventListener_CacheLoader" }); + vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, false, null, false, "MyAsyncEventListener_CacheLoader" }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithCacheLoaderAndAsyncQueue", + new Object[] { testName + "_PR", "ln" }); + vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithCacheLoaderAndAsyncQueue", + new Object[] { testName + "_PR", "ln" }); + vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithCacheLoaderAndAsyncQueue", + new Object[] { testName + "_PR", "ln" }); + vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithCacheLoaderAndAsyncQueue", + new Object[] { testName + "_PR", "ln" }); + + vm4.invoke(AsyncEventQueueTestBase.class, "doPutAll", new Object[] { testName + "_PR", + 100, 10 }); + vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventForOperationDetail", + new Object[] { "ln", 250, false, true }); + vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventForOperationDetail", + new Object[] { "ln", 250, false, true }); + vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventForOperationDetail", + new Object[] { "ln", 250, false, true }); + vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventForOperationDetail", + new Object[] { "ln", 250, false, true }); + } + + public void testParallelAsyncEventQueueSize() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, false, null, false }); + vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, false, null, false }); + vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, false, null, false }); + vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, false, null, false }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + + vm4 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + vm5 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + vm6 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + vm7 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + pause(1000);// pause at least for the batchTimeInterval + + vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR", + 1000 }); + + int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class, + "getAsyncEventQueueSize", new Object[] { "ln" }); + int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class, + "getAsyncEventQueueSize", new Object[] { "ln" }); + + assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm4size); + assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm5size); + } + + /** + * Added to reproduce defect #50366: + * NullPointerException with AsyncEventQueue#size() when number of dispatchers is more than 1 + */ + public void testConcurrentParallelAsyncEventQueueSize() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, false, null, false, 2, OrderPolicy.KEY }); + vm5.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, false, null, false, 2, OrderPolicy.KEY }); + vm6.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, false, null, false, 2, OrderPolicy.KEY }); + vm7.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, false, null, false, 2, OrderPolicy.KEY }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + + vm4 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + vm5 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + vm6 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + vm7 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + pause(1000);// pause at least for the batchTimeInterval + + vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR", + 1000 }); + + int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class, + "getAsyncEventQueueSize", new Object[] { "ln" }); + int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class, + "getAsyncEventQueueSize", new Object[] { "ln" }); + + assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm4size); + assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm5size); + } + + public void testParallelAsyncEventQueueWithConflationEnabled() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, true, false, null, false }); + vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, true, false, null, false }); + vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, true, false, null, false }); + vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, true, false, null, false }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + + vm4 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + vm5 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + vm6 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + vm7 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + + pause(2000);// pause for the batchTimeInterval to ensure that all the + // senders are paused + + final Map keyValues = new HashMap(); + final Map updateKeyValues = new HashMap(); + for (int i = 0; i < 1000; i++) { + keyValues.put(i, i); + } + + vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] { + testName + "_PR", keyValues }); + + vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] { + "ln", keyValues.size() }); + + for (int i = 0; i < 500; i++) { + updateKeyValues.put(i, i + "_updated"); + } + + vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] { + testName + "_PR", updateKeyValues }); + + + vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncEventQueueSize", new Object[] { + "ln", keyValues.size() + updateKeyValues.size() }); // no conflation of creates + + vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] { + testName + "_PR", updateKeyValues }); + + vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncEventQueueSize", new Object[] { + "ln", keyValues.size() + updateKeyValues.size() }); // conflation of updates + + vm4.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue", + new Object[] { "ln" }); + vm5.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue", + new Object[] { "ln" }); + vm6.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue", + new Object[] { "ln" }); + vm7.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue", + new Object[] { "ln" }); + + vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + + int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize", + new Object[] { "ln"}); + int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize", + new Object[] { "ln"}); + int vm6size = (Integer)vm6.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize", + new Object[] { "ln"}); + int vm7size = (Integer)vm7.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize", + new Object[] { "ln"}); + + assertEquals(vm4size + vm5size + vm6size + vm7size, keyValues.size()); + } + + /** + * Added to reproduce defect #47213 + */ + public void testParallelAsyncEventQueueWithConflationEnabled_bug47213() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, true, false, null, false }); + vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, true, false, null, false }); + vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, true, false, null, false }); + vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, true, false, null, false }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createPRWithRedundantCopyWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createPRWithRedundantCopyWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createPRWithRedundantCopyWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createPRWithRedundantCopyWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + + vm4 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + vm5 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + vm6 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + vm7 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + + pause(2000);// pause for the batchTimeInterval to ensure that all the + // senders are paused + + final Map keyValues = new HashMap(); + final Map updateKeyValues = new HashMap(); + for (int i = 0; i < 1000; i++) { + keyValues.put(i, i); + } + + vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] { + testName + "_PR", keyValues }); + + pause(2000); + vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] { + "ln", keyValues.size() }); + + for (int i = 0; i < 500; i++) { + updateKeyValues.put(i, i + "_updated"); + } + + vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] { + testName + "_PR", updateKeyValues }); + + vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] { + testName + "_PR", updateKeyValues }); + + // pause to ensure that events have been conflated. + pause(2000); + vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] { + "ln", keyValues.size() + updateKeyValues.size() }); + + vm4.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue", + new Object[] { "ln" }); + vm5.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue", + new Object[] { "ln" }); + vm6.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue", + new Object[] { "ln" }); + vm7.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue", + new Object[] { "ln" }); + + vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + + int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize", + new Object[] { "ln"}); + int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize", + new Object[] { "ln"}); + int vm6size = (Integer)vm6.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize", + new Object[] { "ln"}); + int vm7size = (Integer)vm7.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize", + new Object[] { "ln"}); + + assertEquals(vm4size + vm5size + vm6size + vm7size, keyValues.size()); + + } + + public void testParallelAsyncEventQueueWithOneAccessor() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm3.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm3.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, false, null, false }); + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, false, null, false }); + vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, false, null, false }); + vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, false, null, false }); + vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, false, null, false }); + + vm3.invoke(AsyncEventQueueTestBase.class, + "createPartitionedRegionAccessorWithAsyncEventQueue", new Object[] { + testName + "_PR", "ln" }); + vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + + vm3.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR", + 256 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + + vm3.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 }); + + int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize", + new Object[] { "ln"}); + int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize", + new Object[] { "ln"}); + int vm6size = (Integer)vm6.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize", + new Object[] { "ln"}); + int vm7size = (Integer)vm7.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize", + new Object[] { "ln"}); + + assertEquals(vm4size + vm5size + vm6size + vm7size, 256); + + } + + public void testParallelAsyncEventQueueWithPersistence() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, true, null, false }); + vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, true, null, false }); + vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, true, null, false }); + vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, true, null, false }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + + vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR", + 256 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + + int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize", + new Object[] { "ln"}); + int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize", + new Object[] { "ln"}); + int vm6size = (Integer)vm6.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize", + new Object[] { "ln"}); + int vm7size = (Integer)vm7.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize", + new Object[] { "ln"}); + + assertEquals(vm4size + vm5size + vm6size + vm7size, 256); + } + + /** + * Below test is disabled intentionally Replicated region with Parallel Async + * Event queue is not supported. Test is added for the same + * testParallelAsyncEventQueueWithReplicatedRegion + * + * We are gone support this configuration in upcoming releases + */ + + public void DISABLED_DUETO_BUG51491_testReplicatedParallelAsyncEventQueue() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, false, null, false }); + vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, false, null, false }); + vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, false, null, false }); + vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, false, null, false }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + + vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR", + 1000 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + + int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class, + "getAsyncEventListenerMapSize", new Object[] { "ln" }); + int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class, + "getAsyncEventListenerMapSize", new Object[] { "ln" }); + int vm6size = (Integer)vm6.invoke(AsyncEventQueueTestBase.class, + "getAsyncEventListenerMapSize", new Object[] { "ln" }); + int vm7size = (Integer)vm7.invoke(AsyncEventQueueTestBase.class, + "getAsyncEventListenerMapSize", new Object[] { "ln" }); + + assertEquals(vm4size + vm5size + vm6size + vm7size, 1000); + } + +/** + * Test case to test possibleDuplicates. vm4 & vm5 are hosting the PR. vm5 is + * killed so the buckets hosted by it are shifted to vm4. + */ + @Ignore("Disabled for 52349") + public void DISABLED_testParallelAsyncEventQueueHA_Scenario1() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + getLogWriter().info("Created the cache"); + + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithListener2", + new Object[] { "ln", true, 100, 5, false, null }); + vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithListener2", + new Object[] { "ln", true, 100, 5, false, null }); + + getLogWriter().info("Created the AsyncEventQueue"); + + vm4.invoke(AsyncEventQueueTestBase.class, + "createPRWithRedundantCopyWithAsyncEventQueue", new Object[] { + testName + "_PR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, + "createPRWithRedundantCopyWithAsyncEventQueue", new Object[] { + testName + "_PR", "ln", isOffHeap() }); + + getLogWriter().info("Created PR with AsyncEventQueue"); + + vm4 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + vm5 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + pause(1000);// pause for the batchTimeInterval to make sure the AsyncQueue + // is paused + + getLogWriter().info("Paused the AsyncEventQueue"); + + vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", + new Object[] { testName + "_PR", 80 }); + + getLogWriter().info("Done puts"); + + Set primaryBucketsVm5 = (Set)vm5.invoke( + AsyncEventQueueTestBase.class, "getAllPrimaryBucketsOnTheNode", + new Object[] { testName + "_PR" }); + + getLogWriter().info("Primary buckets on vm5: " + primaryBucketsVm5); + // ---------------------------- Kill vm5 -------------------------- + vm5.invoke(AsyncEventQueueTestBase.class, "killSender", new Object[] {}); + + pause(1000);// give some time for rebalancing to happen + vm4.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue", + new Object[] { "ln" }); + + vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] {