Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BABD2181BA for ; Tue, 8 Dec 2015 23:42:20 +0000 (UTC) Received: (qmail 86509 invoked by uid 500); 8 Dec 2015 23:42:20 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 86402 invoked by uid 500); 8 Dec 2015 23:42:20 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 86386 invoked by uid 99); 8 Dec 2015 23:42:20 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Dec 2015 23:42:20 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 120D6180999 for ; Tue, 8 Dec 2015 23:42:20 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.227 X-Spam-Level: * X-Spam-Status: No, score=1.227 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.554, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id V3A646JStwn1 for ; Tue, 8 Dec 2015 23:42:13 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 6A4CF20314 for ; Tue, 8 Dec 2015 23:42:11 +0000 (UTC) Received: (qmail 85280 invoked by uid 99); 8 Dec 2015 23:42:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Dec 2015 23:42:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 44496E0972; Tue, 8 Dec 2015 23:42:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: upthewaterspout@apache.org To: commits@geode.incubator.apache.org Date: Tue, 08 Dec 2015 23:42:10 -0000 Message-Id: <857b57c838c74a7ca8fd9e434448095c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] incubator-geode git commit: GEODE-637: Additional tests for AsyncEventQueues Repository: incubator-geode Updated Branches: refs/heads/develop 386d1ac8c -> 476c6cd3b http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/476c6cd3/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerOffHeapDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerOffHeapDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerOffHeapDUnitTest.java new file mode 100644 index 0000000..b050ef5 --- /dev/null +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerOffHeapDUnitTest.java @@ -0,0 +1,17 @@ +package com.gemstone.gemfire.internal.cache.wan.asyncqueue; + + +@SuppressWarnings("serial") +public class AsyncEventListenerOffHeapDUnitTest extends + AsyncEventListenerDUnitTest { + + public AsyncEventListenerOffHeapDUnitTest(String name) { + super(name); + } + + @Override + public boolean isOffHeap() { + return true; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/476c6cd3/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java new file mode 100644 index 0000000..cf4a184 --- /dev/null +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java @@ -0,0 +1,311 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.internal.cache.wan.asyncqueue; + +import java.util.HashMap; +import java.util.Map; + +import com.gemstone.gemfire.internal.cache.wan.AsyncEventQueueTestBase; + +import dunit.AsyncInvocation; + +public class AsyncEventQueueStatsDUnitTest extends AsyncEventQueueTestBase { + + private static final long serialVersionUID = 1L; + + public AsyncEventQueueStatsDUnitTest(String name) { + super(name); + } + + public void setUp() throws Exception { + super.setUp(); + } + + /** + * Normal replication scenario + */ + public void testReplicatedSerialPropagation() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false }); + vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false }); + vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false }); + vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + + vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR", + 1000 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 1000 });// primary sender + pause(2000);//give some time for system to become stable + + vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats", new Object[] { + "ln", 0, 1000, 1000, 1000 }); + vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueBatchStats", + new Object[] { "ln", 10 }); + vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats", new Object[] { + "ln", 0, 1000, 0, 0 }); + vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueBatchStats", + new Object[] { "ln", 0 }); + } + + /** + * Two listeners added to the same RR. + */ + public void testAsyncStatsTwoListeners() throws Exception { + Integer lnPort = createFirstLocatorWithDSId(1); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] {lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] {lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] {lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] {lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln1", + false, 100, 100, false, false, null, false }); + vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln1", + false, 100, 100, false, false, null, false }); + vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln1", + false, 100, 100, false, false, null, false }); + vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln1", + false, 100, 100, false, false, null, false }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln2", + false, 100, 100, false, false, null, false }); + vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln2", + false, 100, 100, false, false, null, false }); + vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln2", + false, 100, 100, false, false, null, false }); + vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln2", + false, 100, 100, false, false, null, false }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln1,ln2", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln1,ln2", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln1,ln2", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln1,ln2", isOffHeap() }); + + vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR", + 1000 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln1", 1000 }); + vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln2", 1000 }); + pause(2000);//give some time for system to become stable + + vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats", new Object[] { + "ln1", 0, 1000, 1000, 1000 }); + vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueBatchStats", + new Object[] { "ln1", 10 }); + vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats", new Object[] { + "ln2", 0, 1000, 1000, 1000 }); + vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueBatchStats", + new Object[] { "ln2", 10 }); + vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats", new Object[] { + "ln1", 0, 1000, 0, 0 }); + vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueBatchStats", + new Object[] { "ln1", 0 }); + vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats", new Object[] { + "ln2", 0, 1000, 0, 0 }); + vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueBatchStats", + new Object[] { "ln2", 0 }); + } + + /** + * HA scenario: kill one vm when puts are in progress on the other vm. + */ + public void testReplicatedSerialPropagationHA() throws Exception { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] {lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] {lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] {lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] {lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false }); + vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + + AsyncInvocation inv1 = vm5.invokeAsync(AsyncEventQueueTestBase.class, "doPuts", + new Object[] { testName + "_RR", 10000 }); + pause(2000); + AsyncInvocation inv2 = vm4.invokeAsync(AsyncEventQueueTestBase.class, "killAsyncEventQueue", new Object[] { "ln" }); + Boolean isKilled = Boolean.FALSE; + try { + isKilled = (Boolean)inv2.getResult(); + } + catch (Throwable e) { + fail("Unexpected exception while killing a AsyncEventQueue"); + } + AsyncInvocation inv3 = null; + if(!isKilled){ + inv3 = vm5.invokeAsync(AsyncEventQueueTestBase.class, "killSender", new Object[] { "ln" }); + inv3.join(); + } + inv1.join(); + inv2.join(); + pause(2000);//give some time for system to become stable + vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats_Failover", new Object[] {"ln", 10000}); + } + + /** + * Two regions attached to same AsyncEventQueue + */ + public void testReplicatedSerialPropagationUNPorcessedEvents() throws Exception { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false }); + vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, false, false, null, false }); + + //create one RR (RR_1) on local site + vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] { + testName + "_RR_1", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] { + testName + "_RR_1", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] { + testName + "_RR_1", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] { + testName + "_RR_1", "ln", isOffHeap() }); + + //create another RR (RR_2) on local site + vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] { + testName + "_RR_2", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] { + testName + "_RR_2", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] { + testName + "_RR_2", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] { + testName + "_RR_2", "ln", isOffHeap() }); + + //start puts in RR_1 in another thread + vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR_1", 1000 }); + //do puts in RR_2 in main thread + vm4.invoke(AsyncEventQueueTestBase.class, "doPutsFrom", new Object[] { testName + "_RR_2", 1000, 1500 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 1500 }); + + pause(2000);//give some time for system to become stable + vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats", new Object[] {"ln", + 0, 1500, 1500, 1500}); + vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueUnprocessedStats", new Object[] {"ln", 0}); + + + vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats", new Object[] {"ln", + 0, 1500, 0, 0}); + vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueUnprocessedStats", new Object[] {"ln", 1500}); + } + + /** + * Test with conflation enabled + */ + public void testSerialPropagationConflation() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + false, 100, 100, true, false, null, false }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + + vm4 + .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue", + new Object[] { "ln" }); + //pause at least for the batchTimeInterval to make sure that the AsyncEventQueue is actually paused + pause(2000); + + final Map keyValues = new HashMap(); + final Map updateKeyValues = new HashMap(); + for(int i=0; i< 1000; i++) { + keyValues.put(i, i); + } + + vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] { testName + "_RR", keyValues }); + vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] { "ln", keyValues.size() }); + + for(int i=0;i<500;i++) { + updateKeyValues.put(i, i+"_updated"); + } + + // Put the update events and check the queue size. + // There should be no conflation with the previous create events. + vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] { testName + "_RR", updateKeyValues }); + vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] { "ln", keyValues.size() + updateKeyValues.size() }); + + // Put the update events again and check the queue size. + // There should be conflation with the previous update events. + vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] { testName + "_RR", updateKeyValues }); + vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] { "ln", keyValues.size() + updateKeyValues.size() }); + + vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue", new Object[] { "ln" }); + vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 1000 }); + + pause(2000);// give some time for system to become stable + vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats", new Object[] { + "ln", 0, 2000, 2000, 1000 }); + vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueConflatedStats", + new Object[] { "ln", 500 }); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/476c6cd3/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentAsyncEventQueueDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentAsyncEventQueueDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentAsyncEventQueueDUnitTest.java new file mode 100644 index 0000000..2fb7496 --- /dev/null +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentAsyncEventQueueDUnitTest.java @@ -0,0 +1,330 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +/** + * + */ +package com.gemstone.gemfire.internal.cache.wan.concurrent; + +import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl; +import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; +import com.gemstone.gemfire.internal.cache.wan.AsyncEventQueueTestBase; + +import dunit.AsyncInvocation; + +/** + * @author skumar + * + */ +public class ConcurrentAsyncEventQueueDUnitTest extends AsyncEventQueueTestBase { + + private static final long serialVersionUID = 1L; + + public ConcurrentAsyncEventQueueDUnitTest(String name) { + super(name); + } + + public void setUp() throws Exception { + super.setUp(); + } + + public void testConcurrentSerialAsyncEventQueueAttributes() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + false, 100, 150, true, true, "testDS", true, 5, OrderPolicy.THREAD }); + + vm4.invoke(AsyncEventQueueTestBase.class, "validateConcurrentAsyncEventQueueAttributes", + new Object[] { "ln", 100, 150, AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL, true, "testDS", true, true, 5, OrderPolicy.THREAD }); + } + + + public void testConcurrentParallelAsyncEventQueueAttributesOrderPolicyKey() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + true, 100, 150, true, true, "testDS", true, 5, OrderPolicy.KEY }); + + vm4.invoke(AsyncEventQueueTestBase.class, "validateConcurrentAsyncEventQueueAttributes", + new Object[] { "ln", 100, 150, AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL, true, "testDS", true, true, 5, OrderPolicy.KEY }); + } + + public void testConcurrentParallelAsyncEventQueueAttributesOrderPolicyPartition() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + true, 100, 150, true, true, "testDS", true, 5, OrderPolicy.PARTITION }); + + vm4.invoke(AsyncEventQueueTestBase.class, "validateConcurrentAsyncEventQueueAttributes", + new Object[] { "ln", 100, 150, AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL, true, "testDS", true, true, 5, OrderPolicy.PARTITION }); + } + + /** + * Test configuration:: + * + * Region: Replicated + * WAN: Serial + * Dispatcher threads: more than 1 + * Order policy: key based ordering + */ + + public void testReplicatedSerialAsyncEventQueueWithMultipleDispatcherThreadsOrderPolicyKey() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + false, 100, 10, true, false, null, false, 3, OrderPolicy.KEY }); + vm5.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + false, 100, 10, true, false, null, false, 3, OrderPolicy.KEY }); + vm6.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + false, 100, 10, true, false, null, false, 3, OrderPolicy.KEY }); + vm7.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + false, 100, 10, true, false, null, false, 3, OrderPolicy.KEY }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + + vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR", + 100 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + + vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 100 });// primary sender + vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 });// secondary + vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 });// secondary + vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 });// secondary + } + + /** + * Test configuration:: + * + * Region: Replicated + * WAN: Serial + * Dispatcher threads: more than 1 + * Order policy: Thread ordering + */ + + public void testReplicatedSerialAsyncEventQueueWithMultipleDispatcherThreadsOrderPolicyThread() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + false, 100, 10, true, false, null, false, 3, OrderPolicy.THREAD }); + vm5.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + false, 100, 10, true, false, null, false, 3, OrderPolicy.THREAD }); + vm6.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + false, 100, 10, true, false, null, false, 3, OrderPolicy.THREAD }); + vm7.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + false, 100, 10, true, false, null, false, 3, OrderPolicy.THREAD }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", + new Object[] { testName + "_RR", "ln", isOffHeap() }); + + AsyncInvocation inv1 = vm4.invokeAsync(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR", + 50 }); + AsyncInvocation inv2 = vm4.invokeAsync(AsyncEventQueueTestBase.class, "doNextPuts", new Object[] { testName + "_RR", + 50, 100 }); + AsyncInvocation inv3 = vm4.invokeAsync(AsyncEventQueueTestBase.class, "doNextPuts", new Object[] { testName + "_RR", + 100, 150 }); + + try { + inv1.join(); + inv2.join(); + inv3.join(); + } catch (InterruptedException ie) { + fail( + "Cought interrupted exception while waiting for the task tgo complete.", + ie); + } + + vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + + vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 150 });// primary sender + vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 });// secondary + vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 });// secondary + vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener", + new Object[] { "ln", 0 });// secondary + } + + /** + * Test configuration:: + * + * Region: PartitionedRegion + * WAN: Parallel + * Dispatcher threads: more than 1 + * Order policy: key based ordering + */ + // Disabling test for bug #48323 + public void testPartitionedParallelAsyncEventQueueWithMultipleDispatcherThreadsOrderPolicyKey() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + true, 100, 10, true, false, null, false, 3, OrderPolicy.KEY }); + vm5.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + true, 100, 10, true, false, null, false, 3, OrderPolicy.KEY }); + vm6.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + true, 100, 10, true, false, null, false, 3, OrderPolicy.KEY }); + vm7.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln", + true, 100, 10, true, false, null, false, 3, OrderPolicy.KEY }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + + vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR", + 100 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + + int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize", + new Object[] { "ln"}); + int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize", + new Object[] { "ln"}); + int vm6size = (Integer)vm6.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize", + new Object[] { "ln"}); + int vm7size = (Integer)vm7.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize", + new Object[] { "ln"}); + + assertEquals(vm4size + vm5size + vm6size + vm7size, 100); + + } + + + /** + * Test configuration:: + * + * Region: PartitionedRegion + * WAN: Parallel + * Dispatcher threads: more than 1 + * Order policy: PARTITION based ordering + */ + // Disabled test for bug #48323 + public void testPartitionedParallelAsyncEventQueueWithMultipleDispatcherThreadsOrderPolicyPartition() { + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", + new Object[] { "ln", true, 100, 10, true, false, null, false, 3, + OrderPolicy.PARTITION }); + vm5.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", + new Object[] { "ln", true, 100, 10, true, false, null, false, 3, + OrderPolicy.PARTITION }); + vm6.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", + new Object[] { "ln", true, 100, 10, true, false, null, false, 3, + OrderPolicy.PARTITION }); + vm7.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", + new Object[] { "ln", true, 100, 10, true, false, null, false, 3, + OrderPolicy.PARTITION }); + + vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR", "ln", isOffHeap() }); + + vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR", + 100 }); + + vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty", + new Object[] { "ln" }); + + int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class, + "getAsyncEventListenerMapSize", new Object[] { "ln" }); + int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class, + "getAsyncEventListenerMapSize", new Object[] { "ln" }); + int vm6size = (Integer)vm6.invoke(AsyncEventQueueTestBase.class, + "getAsyncEventListenerMapSize", new Object[] { "ln" }); + int vm7size = (Integer)vm7.invoke(AsyncEventQueueTestBase.class, + "getAsyncEventListenerMapSize", new Object[] { "ln" }); + + assertEquals(100, vm4size + vm5size + vm6size + vm7size); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/476c6cd3/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentAsyncEventQueueOffHeapDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentAsyncEventQueueOffHeapDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentAsyncEventQueueOffHeapDUnitTest.java new file mode 100644 index 0000000..41eb22d --- /dev/null +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentAsyncEventQueueOffHeapDUnitTest.java @@ -0,0 +1,16 @@ +package com.gemstone.gemfire.internal.cache.wan.concurrent; + +@SuppressWarnings("serial") +public class ConcurrentAsyncEventQueueOffHeapDUnitTest extends + ConcurrentAsyncEventQueueDUnitTest { + + public ConcurrentAsyncEventQueueOffHeapDUnitTest(String name) { + super(name); + } + + @Override + public boolean isOffHeap() { + return true; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/476c6cd3/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelAsyncEventQueueDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelAsyncEventQueueDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelAsyncEventQueueDUnitTest.java new file mode 100644 index 0000000..425d1a6 --- /dev/null +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelAsyncEventQueueDUnitTest.java @@ -0,0 +1,53 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +/** + * + */ +package com.gemstone.gemfire.internal.cache.wan.misc; + +import com.gemstone.gemfire.internal.cache.wan.AsyncEventQueueTestBase; + +/** + * @author skumar + * + */ +public class CommonParallelAsyncEventQueueDUnitTest extends AsyncEventQueueTestBase { + + private static final long serialVersionUID = 1L; + + public CommonParallelAsyncEventQueueDUnitTest(String name) { + super(name); + } + + public void setUp() throws Exception { + super.setUp(); + } + + public void testSameSenderWithNonColocatedRegions() throws Exception { + addExpectedException("cannot have the same parallel async"); + Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort }); + vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln", + true, 100, 100, false, false, null, false }); + vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR1", "ln", isOffHeap() }); + try { + vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue", + new Object[] { testName + "_PR2", "ln", isOffHeap() }); + fail("Expected IllegateStateException : cannot have the same parallel gateway sender"); + } + catch (Exception e) { + if (!(e.getCause() instanceof IllegalStateException) + || !(e.getCause().getMessage() + .contains("cannot have the same parallel async event queue id"))) { + fail("Expected IllegalStateException", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/476c6cd3/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelAsyncEventQueueOffHeapDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelAsyncEventQueueOffHeapDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelAsyncEventQueueOffHeapDUnitTest.java new file mode 100644 index 0000000..8ab77b9 --- /dev/null +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelAsyncEventQueueOffHeapDUnitTest.java @@ -0,0 +1,16 @@ +package com.gemstone.gemfire.internal.cache.wan.misc; + +@SuppressWarnings("serial") +public class CommonParallelAsyncEventQueueOffHeapDUnitTest extends + CommonParallelAsyncEventQueueDUnitTest { + + public CommonParallelAsyncEventQueueOffHeapDUnitTest(String name) { + super(name); + } + + @Override + public boolean isOffHeap() { + return true; + } + +}