Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A332918B0F for ; Mon, 7 Dec 2015 23:22:53 +0000 (UTC) Received: (qmail 4194 invoked by uid 500); 7 Dec 2015 23:22:53 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 4161 invoked by uid 500); 7 Dec 2015 23:22:53 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 4152 invoked by uid 99); 7 Dec 2015 23:22:53 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Dec 2015 23:22:53 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 03E65C8593 for ; Mon, 7 Dec 2015 23:22:53 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.771 X-Spam-Level: * X-Spam-Status: No, score=1.771 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id ext8ORiyo6W3 for ; Mon, 7 Dec 2015 23:22:42 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 870EF20E9B for ; Mon, 7 Dec 2015 23:22:40 +0000 (UTC) Received: (qmail 3933 invoked by uid 99); 7 Dec 2015 23:22:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Dec 2015 23:22:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A2B20E098F; Mon, 7 Dec 2015 23:22:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: upthewaterspout@apache.org To: commits@geode.incubator.apache.org Date: Mon, 07 Dec 2015 23:22:41 -0000 Message-Id: <5af9e05d9f6549d083a16cce73c59896@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/3] incubator-geode git commit: GEODE-637: Additional tests for AsyncEventQueues GEODE-637: Additional tests for AsyncEventQueues Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/eccf5239 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/eccf5239 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/eccf5239 Branch: refs/heads/feature/GEODE-637 Commit: eccf5239370f934d61f87c713779fcdd48ad588b Parents: bd43c34 Author: Dan Smith Authored: Wed Dec 2 09:51:49 2015 -0800 Committer: Dan Smith Committed: Mon Dec 7 15:21:25 2015 -0800 ---------------------------------------------------------------------- .../cache/wan/AsyncEventQueueTestBase.java | 1658 +++++++++++++++ .../asyncqueue/AsyncEventListenerDUnitTest.java | 1911 ++++++++++++++++++ .../AsyncEventListenerOffHeapDUnitTest.java | 17 + .../AsyncEventQueueStatsDUnitTest.java | 311 +++ .../ConcurrentAsyncEventQueueDUnitTest.java | 330 +++ ...ncurrentAsyncEventQueueOffHeapDUnitTest.java | 16 + .../CommonParallelAsyncEventQueueDUnitTest.java | 53 + ...ParallelAsyncEventQueueOffHeapDUnitTest.java | 16 + 8 files changed, 4312 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/eccf5239/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java new file mode 100644 index 0000000..a800118 --- /dev/null +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java @@ -0,0 +1,1658 @@ +/* + * ========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. This + * product is protected by U.S. and international copyright and intellectual + * property laws. Pivotal products are covered by one or more patents listed at + * http://www.pivotal.io/patents. + * ========================================================================= + */ +package com.gemstone.gemfire.internal.cache.wan; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.StringTokenizer; +import java.util.concurrent.ConcurrentHashMap; + +import com.gemstone.gemfire.cache.AttributesFactory; +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.CacheClosedException; +import com.gemstone.gemfire.cache.CacheFactory; +import com.gemstone.gemfire.cache.CacheLoader; +import com.gemstone.gemfire.cache.DataPolicy; +import com.gemstone.gemfire.cache.Declarable; +import com.gemstone.gemfire.cache.DiskStore; +import com.gemstone.gemfire.cache.DiskStoreFactory; +import com.gemstone.gemfire.cache.EntryEvent; +import com.gemstone.gemfire.cache.LoaderHelper; +import com.gemstone.gemfire.cache.PartitionAttributesFactory; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.RegionFactory; +import com.gemstone.gemfire.cache.Scope; +import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent; +import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener; +import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue; +import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory; +import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl; +import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats; +import com.gemstone.gemfire.cache.client.internal.LocatorDiscoveryCallbackAdapter; +import com.gemstone.gemfire.cache.control.RebalanceFactory; +import com.gemstone.gemfire.cache.control.RebalanceOperation; +import com.gemstone.gemfire.cache.control.RebalanceResults; +import com.gemstone.gemfire.cache.control.ResourceManager; +import com.gemstone.gemfire.cache.persistence.PartitionOfflineException; +import com.gemstone.gemfire.cache.util.CacheListenerAdapter; +import com.gemstone.gemfire.cache.wan.GatewayEventFilter; +import com.gemstone.gemfire.cache.wan.GatewayReceiver; +import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory; +import com.gemstone.gemfire.cache.wan.GatewaySender; +import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; +import com.gemstone.gemfire.cache.wan.GatewaySenderFactory; +import com.gemstone.gemfire.distributed.Locator; +import com.gemstone.gemfire.distributed.internal.DistributionConfig; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.internal.AvailablePortHelper; +import com.gemstone.gemfire.internal.cache.ForceReattemptException; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.PartitionedRegion; +import com.gemstone.gemfire.internal.cache.RegionQueue; + +import dunit.DistributedTestCase; +import dunit.Host; +import dunit.VM; + +public class AsyncEventQueueTestBase extends DistributedTestCase { + + protected static Cache cache; + + protected static VM vm0; + + protected static VM vm1; + + protected static VM vm2; + + protected static VM vm3; + + protected static VM vm4; + + protected static VM vm5; + + protected static VM vm6; + + protected static VM vm7; + + protected static AsyncEventListener eventListener1; + + private static final long MAX_WAIT = 10000; + + protected static GatewayEventFilter eventFilter; + + protected static boolean destroyFlag = false; + + protected static List dispatcherThreads = new ArrayList( + Arrays.asList(1, 3, 5)); + + // this will be set for each test method run with one of the values from above + // list + protected static int numDispatcherThreadsForTheRun = 1; + + public AsyncEventQueueTestBase(String name) { + super(name); + } + + public void setUp() throws Exception { + super.setUp(); + final Host host = Host.getHost(0); + vm0 = host.getVM(0); + vm1 = host.getVM(1); + vm2 = host.getVM(2); + vm3 = host.getVM(3); + vm4 = host.getVM(4); + vm5 = host.getVM(5); + vm6 = host.getVM(6); + vm7 = host.getVM(7); + // this is done to vary the number of dispatchers for sender + // during every test method run + shuffleNumDispatcherThreads(); + invokeInEveryVM(AsyncEventQueueTestBase.class, + "setNumDispatcherThreadsForTheRun", + new Object[] { dispatcherThreads.get(0) }); + } + + public static void shuffleNumDispatcherThreads() { + Collections.shuffle(dispatcherThreads); + } + + public static void setNumDispatcherThreadsForTheRun(int numThreads) { + numDispatcherThreadsForTheRun = numThreads; + } + + public static Integer createFirstLocatorWithDSId(int dsId) { + if (Locator.hasLocator()) { + Locator.getLocator().stop(); + } + AsyncEventQueueTestBase test = new AsyncEventQueueTestBase(testName); + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + Properties props = new Properties(); + props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); + //props.setProperty(DistributionConfig.DISTRIBUTED_SYSTEM_ID_NAME, "" + dsId); + props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + port + + "]"); + props.setProperty(DistributionConfig.START_LOCATOR_NAME, "localhost[" + + port + "],server=true,peer=true,hostname-for-clients=localhost"); + test.getSystem(props); + return port; + } + + public static Integer createFirstRemoteLocator(int dsId, int remoteLocPort) { + AsyncEventQueueTestBase test = new AsyncEventQueueTestBase(testName); + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + Properties props = new Properties(); + props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); + props.setProperty(DistributionConfig.DISTRIBUTED_SYSTEM_ID_NAME, "" + dsId); + props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + port + + "]"); + props.setProperty(DistributionConfig.START_LOCATOR_NAME, "localhost[" + + port + "],server=true,peer=true,hostname-for-clients=localhost"); + props.setProperty(DistributionConfig.REMOTE_LOCATORS_NAME, "localhost[" + + remoteLocPort + "]"); + test.getSystem(props); + return port; + } + + public static void createReplicatedRegionWithAsyncEventQueue( + String regionName, String asyncQueueIds, Boolean offHeap) { + ExpectedException exp1 = addExpectedException(ForceReattemptException.class + .getName()); + try { + AttributesFactory fact = new AttributesFactory(); + if (asyncQueueIds != null) { + StringTokenizer tokenizer = new StringTokenizer(asyncQueueIds, ","); + while (tokenizer.hasMoreTokens()) { + String asyncQueueId = tokenizer.nextToken(); + fact.addAsyncEventQueueId(asyncQueueId); + } + } + fact.setDataPolicy(DataPolicy.REPLICATE); + fact.setOffHeap(offHeap); + RegionFactory regionFactory = cache.createRegionFactory(fact.create()); + Region r = regionFactory.create(regionName); + assertNotNull(r); + } + finally { + exp1.remove(); + } + } + + public static void createReplicatedRegionWithCacheLoaderAndAsyncEventQueue( + String regionName, String asyncQueueIds) { + + AttributesFactory fact = new AttributesFactory(); + if (asyncQueueIds != null) { + StringTokenizer tokenizer = new StringTokenizer(asyncQueueIds, ","); + while (tokenizer.hasMoreTokens()) { + String asyncQueueId = tokenizer.nextToken(); + fact.addAsyncEventQueueId(asyncQueueId); + } + } + fact.setDataPolicy(DataPolicy.REPLICATE); + // set the CacheLoader + fact.setCacheLoader(new MyCacheLoader()); + RegionFactory regionFactory = cache.createRegionFactory(fact.create()); + Region r = regionFactory.create(regionName); + assertNotNull(r); + } + + public static void createReplicatedRegionWithSenderAndAsyncEventQueue( + String regionName, String senderIds, String asyncChannelId, Boolean offHeap) { + ExpectedException exp = addExpectedException(ForceReattemptException.class + .getName()); + try { + + AttributesFactory fact = new AttributesFactory(); + if (senderIds != null) { + StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); + while (tokenizer.hasMoreTokens()) { + String senderId = tokenizer.nextToken(); + fact.addGatewaySenderId(senderId); + } + } + fact.setDataPolicy(DataPolicy.REPLICATE); + fact.setOffHeap(offHeap); + fact.setScope(Scope.DISTRIBUTED_ACK); + RegionFactory regionFactory = cache.createRegionFactory(fact.create()); + regionFactory.addAsyncEventQueueId(asyncChannelId); + Region r = regionFactory.create(regionName); + assertNotNull(r); + } + finally { + exp.remove(); + } + } + + public static void createAsyncEventQueue(String asyncChannelId, + boolean isParallel, Integer maxMemory, Integer batchSize, + boolean isConflation, boolean isPersistent, String diskStoreName, + boolean isDiskSynchronous) { + + if (diskStoreName != null) { + File directory = new File(asyncChannelId + "_disk_" + + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); + directory.mkdir(); + File[] dirs1 = new File[] { directory }; + DiskStoreFactory dsf = cache.createDiskStoreFactory(); + dsf.setDiskDirs(dirs1); + DiskStore ds = dsf.create(diskStoreName); + } + + AsyncEventListener asyncEventListener = new MyAsyncEventListener(); + + AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory(); + factory.setBatchSize(batchSize); + factory.setPersistent(isPersistent); + factory.setDiskStoreName(diskStoreName); + factory.setDiskSynchronous(isDiskSynchronous); + factory.setBatchConflationEnabled(isConflation); + factory.setMaximumQueueMemory(maxMemory); + factory.setParallel(isParallel); + // set dispatcher threads + factory.setDispatcherThreads(numDispatcherThreadsForTheRun); + AsyncEventQueue asyncChannel = factory.create(asyncChannelId, + asyncEventListener); + } + + public static void createAsyncEventQueueWithListener2(String asyncChannelId, + boolean isParallel, Integer maxMemory, Integer batchSize, + boolean isPersistent, String diskStoreName) { + + if (diskStoreName != null) { + File directory = new File(asyncChannelId + "_disk_" + + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); + directory.mkdir(); + File[] dirs1 = new File[] { directory }; + DiskStoreFactory dsf = cache.createDiskStoreFactory(); + dsf.setDiskDirs(dirs1); + DiskStore ds = dsf.create(diskStoreName); + } + + AsyncEventListener asyncEventListener = new MyAsyncEventListener2(); + + AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory(); + factory.setBatchSize(batchSize); + factory.setPersistent(isPersistent); + factory.setDiskStoreName(diskStoreName); + factory.setMaximumQueueMemory(maxMemory); + factory.setParallel(isParallel); + // set dispatcher threads + factory.setDispatcherThreads(numDispatcherThreadsForTheRun); + AsyncEventQueue asyncChannel = factory.create(asyncChannelId, + asyncEventListener); + } + + public static void createAsyncEventQueue(String asyncChannelId, + boolean isParallel, Integer maxMemory, Integer batchSize, + boolean isConflation, boolean isPersistent, String diskStoreName, + boolean isDiskSynchronous, String asyncListenerClass) throws Exception { + + if (diskStoreName != null) { + File directory = new File(asyncChannelId + "_disk_" + + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); + directory.mkdir(); + File[] dirs1 = new File[] { directory }; + DiskStoreFactory dsf = cache.createDiskStoreFactory(); + dsf.setDiskDirs(dirs1); + DiskStore ds = dsf.create(diskStoreName); + } + + String packagePrefix = "com.gemstone.gemfire.internal.cache.wan."; + String className = packagePrefix + asyncListenerClass; + AsyncEventListener asyncEventListener = null; + try { + Class clazz = Class.forName(className); + asyncEventListener = (AsyncEventListener)clazz.newInstance(); + } + catch (ClassNotFoundException e) { + throw e; + } + catch (InstantiationException e) { + throw e; + } + catch (IllegalAccessException e) { + throw e; + } + + AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory(); + factory.setBatchSize(batchSize); + factory.setPersistent(isPersistent); + factory.setDiskStoreName(diskStoreName); + factory.setDiskSynchronous(isDiskSynchronous); + factory.setBatchConflationEnabled(isConflation); + factory.setMaximumQueueMemory(maxMemory); + factory.setParallel(isParallel); + // set dispatcher threads + factory.setDispatcherThreads(numDispatcherThreadsForTheRun); + AsyncEventQueue asyncChannel = factory.create(asyncChannelId, + asyncEventListener); + } + + public static void createAsyncEventQueueWithCustomListener( + String asyncChannelId, boolean isParallel, Integer maxMemory, + Integer batchSize, boolean isConflation, boolean isPersistent, + String diskStoreName, boolean isDiskSynchronous) { + createAsyncEventQueueWithCustomListener(asyncChannelId, isParallel, + maxMemory, batchSize, isConflation, isPersistent, diskStoreName, + isDiskSynchronous, GatewaySender.DEFAULT_DISPATCHER_THREADS); + } + + public static void createAsyncEventQueueWithCustomListener( + String asyncChannelId, boolean isParallel, Integer maxMemory, + Integer batchSize, boolean isConflation, boolean isPersistent, + String diskStoreName, boolean isDiskSynchronous, int nDispatchers) { + + ExpectedException exp = addExpectedException(ForceReattemptException.class + .getName()); + + try { + if (diskStoreName != null) { + File directory = new File(asyncChannelId + "_disk_" + + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); + directory.mkdir(); + File[] dirs1 = new File[] { directory }; + DiskStoreFactory dsf = cache.createDiskStoreFactory(); + dsf.setDiskDirs(dirs1); + DiskStore ds = dsf.create(diskStoreName); + } + + AsyncEventListener asyncEventListener = new CustomAsyncEventListener(); + + AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory(); + factory.setBatchSize(batchSize); + factory.setPersistent(isPersistent); + factory.setDiskStoreName(diskStoreName); + factory.setMaximumQueueMemory(maxMemory); + factory.setParallel(isParallel); + factory.setDispatcherThreads(nDispatchers); + AsyncEventQueue asyncChannel = factory.create(asyncChannelId, + asyncEventListener); + } + finally { + exp.remove(); + } + } + + public static void createConcurrentAsyncEventQueue(String asyncChannelId, + boolean isParallel, Integer maxMemory, Integer batchSize, + boolean isConflation, boolean isPersistent, String diskStoreName, + boolean isDiskSynchronous, int dispatcherThreads, OrderPolicy policy) { + + if (diskStoreName != null) { + File directory = new File(asyncChannelId + "_disk_" + + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); + directory.mkdir(); + File[] dirs1 = new File[] { directory }; + DiskStoreFactory dsf = cache.createDiskStoreFactory(); + dsf.setDiskDirs(dirs1); + DiskStore ds = dsf.create(diskStoreName); + } + + AsyncEventListener asyncEventListener = new MyAsyncEventListener(); + + AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory(); + factory.setBatchSize(batchSize); + factory.setPersistent(isPersistent); + factory.setDiskStoreName(diskStoreName); + factory.setDiskSynchronous(isDiskSynchronous); + factory.setBatchConflationEnabled(isConflation); + factory.setMaximumQueueMemory(maxMemory); + factory.setParallel(isParallel); + factory.setDispatcherThreads(dispatcherThreads); + factory.setOrderPolicy(policy); + AsyncEventQueue asyncChannel = factory.create(asyncChannelId, + asyncEventListener); + } + + public static String createAsyncEventQueueWithDiskStore( + String asyncChannelId, boolean isParallel, Integer maxMemory, + Integer batchSize, boolean isPersistent, String diskStoreName) { + + AsyncEventListener asyncEventListener = new MyAsyncEventListener(); + + File persistentDirectory = null; + if (diskStoreName == null) { + persistentDirectory = new File(asyncChannelId + "_disk_" + + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); + } + else { + persistentDirectory = new File(diskStoreName); + } + getLogWriter().info("The ds is : " + persistentDirectory.getName()); + persistentDirectory.mkdir(); + DiskStoreFactory dsf = cache.createDiskStoreFactory(); + File[] dirs1 = new File[] { persistentDirectory }; + + AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory(); + factory.setBatchSize(batchSize); + factory.setParallel(isParallel); + if (isPersistent) { + factory.setPersistent(isPersistent); + factory.setDiskStoreName(dsf.setDiskDirs(dirs1).create(asyncChannelId) + .getName()); + } + factory.setMaximumQueueMemory(maxMemory); + // set dispatcher threads + factory.setDispatcherThreads(numDispatcherThreadsForTheRun); + AsyncEventQueue asyncChannel = factory.create(asyncChannelId, + asyncEventListener); + return persistentDirectory.getName(); + } + + public static void pauseAsyncEventQueue(String asyncChannelId) { + AsyncEventQueue theChannel = null; + + Set asyncEventChannels = cache.getAsyncEventQueues(); + for (AsyncEventQueue asyncChannel : asyncEventChannels) { + if (asyncChannelId.equals(asyncChannel.getId())) { + theChannel = asyncChannel; + } + } + + ((AsyncEventQueueImpl)theChannel).getSender().pause(); + } + + public static void pauseAsyncEventQueueAndWaitForDispatcherToPause( + String asyncChannelId) { + AsyncEventQueue theChannel = null; + + Set asyncEventChannels = cache.getAsyncEventQueues(); + for (AsyncEventQueue asyncChannel : asyncEventChannels) { + if (asyncChannelId.equals(asyncChannel.getId())) { + theChannel = asyncChannel; + break; + } + } + + ((AsyncEventQueueImpl)theChannel).getSender().pause(); + + ((AbstractGatewaySender)((AsyncEventQueueImpl)theChannel).getSender()) + .getEventProcessor().waitForDispatcherToPause(); + } + + public static void resumeAsyncEventQueue(String asyncQueueId) { + AsyncEventQueue theQueue = null; + + Set asyncEventChannels = cache.getAsyncEventQueues(); + for (AsyncEventQueue asyncChannel : asyncEventChannels) { + if (asyncQueueId.equals(asyncChannel.getId())) { + theQueue = asyncChannel; + } + } + + ((AsyncEventQueueImpl)theQueue).getSender().resume(); + } + + public static void checkAsyncEventQueueSize(String asyncQueueId, + int numQueueEntries) { + AsyncEventQueue theAsyncEventQueue = null; + + Set asyncEventChannels = cache.getAsyncEventQueues(); + for (AsyncEventQueue asyncChannel : asyncEventChannels) { + if (asyncQueueId.equals(asyncChannel.getId())) { + theAsyncEventQueue = asyncChannel; + } + } + + GatewaySender sender = ((AsyncEventQueueImpl)theAsyncEventQueue) + .getSender(); + + if (sender.isParallel()) { + Set queues = ((AbstractGatewaySender)sender).getQueues(); + assertEquals(numQueueEntries, + queues.toArray(new RegionQueue[queues.size()])[0].getRegion().size()); + } + else { + Set queues = ((AbstractGatewaySender)sender).getQueues(); + int size = 0; + for (RegionQueue q : queues) { + size += q.size(); + } + assertEquals(numQueueEntries, size); + } + } + + /** + * This method verifies the queue size of a ParallelGatewaySender. For + * ParallelGatewaySender conflation happens in a separate thread, hence test + * code needs to wait for some time for expected result + * + * @param asyncQueueId + * Async Queue ID + * @param numQueueEntries + * expected number of Queue entries + * @throws Exception + */ + public static void waitForAsyncEventQueueSize(String asyncQueueId, + final int numQueueEntries) throws Exception { + AsyncEventQueue theAsyncEventQueue = null; + + Set asyncEventChannels = cache.getAsyncEventQueues(); + for (AsyncEventQueue asyncChannel : asyncEventChannels) { + if (asyncQueueId.equals(asyncChannel.getId())) { + theAsyncEventQueue = asyncChannel; + } + } + + GatewaySender sender = ((AsyncEventQueueImpl)theAsyncEventQueue) + .getSender(); + + if (sender.isParallel()) { + final Set queues = ((AbstractGatewaySender)sender) + .getQueues(); + + waitForCriterion(new WaitCriterion() { + + public String description() { + return "Waiting for EventQueue size to be " + numQueueEntries; + } + + public boolean done() { + boolean done = numQueueEntries == queues.toArray(new RegionQueue[queues + .size()])[0].getRegion().size(); + return done; + } + + }, MAX_WAIT, 500, true); + + } + else { + throw new Exception( + "This method should be used for only ParallelGatewaySender,SerialGatewaySender should use checkAsyncEventQueueSize() method instead"); + + } + } + + public static void createPartitionedRegion(String regionName, + String senderIds, Integer redundantCopies, Integer totalNumBuckets) { + ExpectedException exp = addExpectedException(ForceReattemptException.class + .getName()); + ExpectedException exp1 = addExpectedException(PartitionOfflineException.class + .getName()); + try { + AttributesFactory fact = new AttributesFactory(); + if (senderIds != null) { + StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); + while (tokenizer.hasMoreTokens()) { + String senderId = tokenizer.nextToken(); + // GatewaySender sender = cache.getGatewaySender(senderId); + // assertNotNull(sender); + fact.addGatewaySenderId(senderId); + } + } + PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + pfact.setTotalNumBuckets(totalNumBuckets); + pfact.setRedundantCopies(redundantCopies); + pfact.setRecoveryDelay(0); + fact.setPartitionAttributes(pfact.create()); + Region r = cache.createRegionFactory(fact.create()).create(regionName); + assertNotNull(r); + } + finally { + exp.remove(); + exp1.remove(); + } + } + + public static void createPartitionedRegionWithAsyncEventQueue( + String regionName, String asyncEventQueueId, Boolean offHeap) { + ExpectedException exp = addExpectedException(ForceReattemptException.class + .getName()); + ExpectedException exp1 = addExpectedException(PartitionOfflineException.class + .getName()); + try { + AttributesFactory fact = new AttributesFactory(); + + PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + pfact.setTotalNumBuckets(16); + fact.setPartitionAttributes(pfact.create()); + fact.setOffHeap(offHeap); + Region r = cache.createRegionFactory(fact.create()) + .addAsyncEventQueueId(asyncEventQueueId).create(regionName); + assertNotNull(r); + } + finally { + exp.remove(); + exp1.remove(); + } + } + + public static void createColocatedPartitionedRegionWithAsyncEventQueue( + String regionName, String asyncEventQueueId, Integer totalNumBuckets, + String colocatedWith) { + + ExpectedException exp = addExpectedException(ForceReattemptException.class + .getName()); + ExpectedException exp1 = addExpectedException(PartitionOfflineException.class + .getName()); + try { + AttributesFactory fact = new AttributesFactory(); + + PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + pfact.setTotalNumBuckets(totalNumBuckets); + pfact.setColocatedWith(colocatedWith); + fact.setPartitionAttributes(pfact.create()); + Region r = cache.createRegionFactory(fact.create()) + .addAsyncEventQueueId(asyncEventQueueId).create(regionName); + assertNotNull(r); + } + finally { + exp.remove(); + exp1.remove(); + } + } + + public static void createPartitionedRegionWithCacheLoaderAndAsyncQueue( + String regionName, String asyncEventQueueId) { + + AttributesFactory fact = new AttributesFactory(); + + PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + pfact.setTotalNumBuckets(16); + fact.setPartitionAttributes(pfact.create()); + // set the CacheLoader implementation + fact.setCacheLoader(new MyCacheLoader()); + Region r = cache.createRegionFactory(fact.create()) + .addAsyncEventQueueId(asyncEventQueueId).create(regionName); + assertNotNull(r); + } + + /** + * Create PartitionedRegion with 1 redundant copy + */ + public static void createPRWithRedundantCopyWithAsyncEventQueue( + String regionName, String asyncEventQueueId, Boolean offHeap) { + ExpectedException exp = addExpectedException(ForceReattemptException.class + .getName()); + + try { + AttributesFactory fact = new AttributesFactory(); + + PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + pfact.setTotalNumBuckets(16); + pfact.setRedundantCopies(1); + fact.setPartitionAttributes(pfact.create()); + fact.setOffHeap(offHeap); + Region r = cache.createRegionFactory(fact.create()) + .addAsyncEventQueueId(asyncEventQueueId).create(regionName); + assertNotNull(r); + } + finally { + exp.remove(); + } + } + + public static void createPartitionedRegionAccessorWithAsyncEventQueue( + String regionName, String asyncEventQueueId) { + AttributesFactory fact = new AttributesFactory(); + PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + pfact.setTotalNumBuckets(16); + pfact.setLocalMaxMemory(0); + fact.setPartitionAttributes(pfact.create()); + Region r = cache.createRegionFactory(fact.create()) + .addAsyncEventQueueId(asyncEventQueueId).create(regionName); + // fact.create()).create(regionName); + assertNotNull(r); + } + + protected static void createCache(Integer locPort) { + AsyncEventQueueTestBase test = new AsyncEventQueueTestBase(testName); + Properties props = new Properties(); + props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); + props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort + + "]"); + InternalDistributedSystem ds = test.getSystem(props); + cache = CacheFactory.create(ds); + } + + public static void createCacheWithoutLocator(Integer mCastPort) { + AsyncEventQueueTestBase test = new AsyncEventQueueTestBase(testName); + Properties props = new Properties(); + props.setProperty(DistributionConfig.MCAST_PORT_NAME, "" + mCastPort); + InternalDistributedSystem ds = test.getSystem(props); + cache = CacheFactory.create(ds); + } + + public static void checkAsyncEventQueueStats(String queueId, + final int queueSize, final int eventsReceived, final int eventsQueued, + final int eventsDistributed) { + Set asyncQueues = cache.getAsyncEventQueues(); + AsyncEventQueue queue = null; + for (AsyncEventQueue q : asyncQueues) { + if (q.getId().equals(queueId)) { + queue = q; + break; + } + } + final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue) + .getStatistics(); + assertEquals(queueSize, statistics.getEventQueueSize()); + assertEquals(eventsReceived, statistics.getEventsReceived()); + assertEquals(eventsQueued, statistics.getEventsQueued()); + assert (statistics.getEventsDistributed() >= eventsDistributed); + } + + public static void checkAsyncEventQueueConflatedStats( + String asyncEventQueueId, final int eventsConflated) { + Set queues = cache.getAsyncEventQueues(); + AsyncEventQueue queue = null; + for (AsyncEventQueue q : queues) { + if (q.getId().equals(asyncEventQueueId)) { + queue = q; + break; + } + } + final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue) + .getStatistics(); + assertEquals(eventsConflated, statistics.getEventsNotQueuedConflated()); + } + + public static void checkAsyncEventQueueStats_Failover( + String asyncEventQueueId, final int eventsReceived) { + Set asyncEventQueues = cache.getAsyncEventQueues(); + AsyncEventQueue queue = null; + for (AsyncEventQueue q : asyncEventQueues) { + if (q.getId().equals(asyncEventQueueId)) { + queue = q; + break; + } + } + final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue) + .getStatistics(); + + assertEquals(eventsReceived, statistics.getEventsReceived()); + assertEquals( + eventsReceived, + (statistics.getEventsQueued() + + statistics.getUnprocessedTokensAddedByPrimary() + statistics + .getUnprocessedEventsRemovedByPrimary())); + } + + public static void checkAsyncEventQueueBatchStats(String asyncQueueId, + final int batches) { + Set queues = cache.getAsyncEventQueues(); + AsyncEventQueue queue = null; + for (AsyncEventQueue q : queues) { + if (q.getId().equals(asyncQueueId)) { + queue = q; + break; + } + } + final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue) + .getStatistics(); + assert (statistics.getBatchesDistributed() >= batches); + assertEquals(0, statistics.getBatchesRedistributed()); + } + + public static void checkAsyncEventQueueUnprocessedStats(String asyncQueueId, + int events) { + Set asyncQueues = cache.getAsyncEventQueues(); + AsyncEventQueue queue = null; + for (AsyncEventQueue q : asyncQueues) { + if (q.getId().equals(asyncQueueId)) { + queue = q; + break; + } + } + final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue) + .getStatistics(); + assertEquals(events, + (statistics.getUnprocessedEventsAddedBySecondary() + statistics + .getUnprocessedTokensRemovedBySecondary())); + assertEquals(events, + (statistics.getUnprocessedEventsRemovedByPrimary() + statistics + .getUnprocessedTokensAddedByPrimary())); + } + + public static void setRemoveFromQueueOnException(String senderId, + boolean removeFromQueue) { + Set senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = s; + break; + } + } + assertNotNull(sender); + ((AbstractGatewaySender)sender) + .setRemoveFromQueueOnException(removeFromQueue); + } + + public static void unsetRemoveFromQueueOnException(String senderId) { + Set senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = s; + break; + } + } + assertNotNull(sender); + ((AbstractGatewaySender)sender).setRemoveFromQueueOnException(false); + } + + public static void waitForSenderToBecomePrimary(String senderId) { + Set senders = ((GemFireCacheImpl)cache) + .getAllGatewaySenders(); + final GatewaySender sender = getGatewaySenderById(senders, senderId); + WaitCriterion wc = new WaitCriterion() { + public boolean done() { + if (sender != null && ((AbstractGatewaySender)sender).isPrimary()) { + return true; + } + return false; + } + + public String description() { + return "Expected sender primary state to be true but is false"; + } + }; + DistributedTestCase.waitForCriterion(wc, 10000, 1000, true); + } + + private static GatewaySender getGatewaySenderById(Set senders, + String senderId) { + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + return s; + } + } + // if none of the senders matches with the supplied senderid, return null + return null; + } + + public static void createSender(String dsName, int remoteDsId, + boolean isParallel, Integer maxMemory, Integer batchSize, + boolean isConflation, boolean isPersistent, GatewayEventFilter filter, + boolean isManulaStart) { + final ExpectedException exln = addExpectedException("Could not connect"); + try { + File persistentDirectory = new File(dsName + "_disk_" + + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); + persistentDirectory.mkdir(); + DiskStoreFactory dsf = cache.createDiskStoreFactory(); + File[] dirs1 = new File[] { persistentDirectory }; + if (isParallel) { + GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); + gateway.setParallel(true); + gateway.setMaximumQueueMemory(maxMemory); + gateway.setBatchSize(batchSize); + gateway.setManualStart(isManulaStart); + // set dispatcher threads + gateway.setDispatcherThreads(numDispatcherThreadsForTheRun); + ((InternalGatewaySenderFactory)gateway) + .setLocatorDiscoveryCallback(new MyLocatorCallback()); + if (filter != null) { + eventFilter = filter; + gateway.addGatewayEventFilter(filter); + } + if (isPersistent) { + gateway.setPersistenceEnabled(true); + gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName) + .getName()); + } + else { + DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); + gateway.setDiskStoreName(store.getName()); + } + gateway.setBatchConflationEnabled(isConflation); + gateway.create(dsName, remoteDsId); + + } + else { + GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); + gateway.setMaximumQueueMemory(maxMemory); + gateway.setBatchSize(batchSize); + gateway.setManualStart(isManulaStart); + // set dispatcher threads + gateway.setDispatcherThreads(numDispatcherThreadsForTheRun); + ((InternalGatewaySenderFactory)gateway) + .setLocatorDiscoveryCallback(new MyLocatorCallback()); + if (filter != null) { + eventFilter = filter; + gateway.addGatewayEventFilter(filter); + } + gateway.setBatchConflationEnabled(isConflation); + if (isPersistent) { + gateway.setPersistenceEnabled(true); + gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName) + .getName()); + } + else { + DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); + gateway.setDiskStoreName(store.getName()); + } + gateway.create(dsName, remoteDsId); + } + } + finally { + exln.remove(); + } + } + + public static void pauseWaitCriteria(final long millisec) { + WaitCriterion wc = new WaitCriterion() { + public boolean done() { + return false; + } + + public String description() { + return "Expected to wait for " + millisec + " millisec."; + } + }; + DistributedTestCase.waitForCriterion(wc, millisec, 500, false); + } + + public static int createReceiver(int locPort) { + AsyncEventQueueTestBase test = new AsyncEventQueueTestBase(testName); + Properties props = new Properties(); + props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); + props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort + + "]"); + + InternalDistributedSystem ds = test.getSystem(props); + cache = CacheFactory.create(ds); + GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + fact.setStartPort(port); + fact.setEndPort(port); + fact.setManualStart(true); + GatewayReceiver receiver = fact.create(); + try { + receiver.start(); + } + catch (IOException e) { + e.printStackTrace(); + fail("Test " + test.getName() + + " failed to start GatewayRecevier on port " + port); + } + return port; + } + + public static String makePath(String[] strings) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < strings.length; i++) { + sb.append(strings[i]); + sb.append(File.separator); + } + return sb.toString(); + } + + /** + * Do a rebalance and verify balance was improved. If evictionPercentage > 0 + * (the default) then we have heapLRU and this can cause simulate and + * rebalance results to differ if eviction kicks in between. (See BUG 44899). + */ + public static void doRebalance() { + ResourceManager resMan = cache.getResourceManager(); + boolean heapEviction = (resMan.getEvictionHeapPercentage() > 0); + RebalanceFactory factory = resMan.createRebalanceFactory(); + try { + RebalanceResults simulateResults = null; + if (!heapEviction) { + getLogWriter().info("Calling rebalance simulate"); + RebalanceOperation simulateOp = factory.simulate(); + simulateResults = simulateOp.getResults(); + } + + getLogWriter().info("Starting rebalancing"); + RebalanceOperation rebalanceOp = factory.start(); + RebalanceResults rebalanceResults = rebalanceOp.getResults(); + + } + catch (InterruptedException e) { + fail("Interrupted", e); + } + } + + public static void doPuts(String regionName, int numPuts) { + ExpectedException exp1 = addExpectedException(InterruptedException.class + .getName()); + ExpectedException exp2 = addExpectedException(GatewaySenderException.class + .getName()); + try { + Region r = cache.getRegion(Region.SEPARATOR + regionName); + assertNotNull(r); + for (long i = 0; i < numPuts; i++) { + r.put(i, i); + } + } + finally { + exp1.remove(); + exp2.remove(); + } + // for (long i = 0; i < numPuts; i++) { + // r.destroy(i); + // } + } + + /** + * To be used for CacheLoader related tests + */ + public static void doGets(String regionName, int numGets) { + Region r = cache.getRegion(Region.SEPARATOR + regionName); + assertNotNull(r); + for (long i = 0; i < numGets; i++) { + r.get(i); + } + } + + public static void doPutsFrom(String regionName, int from, int numPuts) { + Region r = cache.getRegion(Region.SEPARATOR + regionName); + assertNotNull(r); + for (long i = from; i < numPuts; i++) { + r.put(i, i); + } + } + + public static void doPutAll(String regionName, int numPuts, int size) { + Region r = cache.getRegion(Region.SEPARATOR + regionName); + assertNotNull(r); + for (long i = 0; i < numPuts; i++) { + Map putAllMap = new HashMap(); + for (long j = 0; j < size; j++) { + putAllMap.put((size * i) + j, i); + } + r.putAll(putAllMap, "putAllCallback"); + putAllMap.clear(); + } + } + + public static void putGivenKeyValue(String regionName, Map keyValues) { + Region r = cache.getRegion(Region.SEPARATOR + regionName); + assertNotNull(r); + for (Object key : keyValues.keySet()) { + r.put(key, keyValues.get(key)); + } + } + + public static void doNextPuts(String regionName, int start, int numPuts) { + // waitForSitesToUpdate(); + ExpectedException exp = addExpectedException(CacheClosedException.class + .getName()); + try { + Region r = cache.getRegion(Region.SEPARATOR + regionName); + assertNotNull(r); + for (long i = start; i < numPuts; i++) { + r.put(i, i); + } + } + finally { + exp.remove(); + } + } + + public static void validateRegionSize(String regionName, final int regionSize) { + ExpectedException exp = addExpectedException(ForceReattemptException.class + .getName()); + ExpectedException exp1 = addExpectedException(CacheClosedException.class + .getName()); + try { + + final Region r = cache.getRegion(Region.SEPARATOR + regionName); + assertNotNull(r); + WaitCriterion wc = new WaitCriterion() { + public boolean done() { + if (r.keySet().size() == regionSize) { + return true; + } + return false; + } + + public String description() { + return "Expected region entries: " + regionSize + + " but actual entries: " + r.keySet().size() + + " present region keyset " + r.keySet(); + } + }; + DistributedTestCase.waitForCriterion(wc, 240000, 500, true); + } + finally { + exp.remove(); + exp1.remove(); + } + } + + /** + * Validate whether all the attributes set on AsyncEventQueueFactory are set + * on the sender underneath the AsyncEventQueue. + */ + public static void validateAsyncEventQueueAttributes(String asyncChannelId, + int maxQueueMemory, int batchSize, int batchTimeInterval, + boolean isPersistent, String diskStoreName, boolean isDiskSynchronous, + boolean batchConflationEnabled) { + + AsyncEventQueue theChannel = null; + + Set asyncEventChannels = cache.getAsyncEventQueues(); + for (AsyncEventQueue asyncChannel : asyncEventChannels) { + if (asyncChannelId.equals(asyncChannel.getId())) { + theChannel = asyncChannel; + } + } + + GatewaySender theSender = ((AsyncEventQueueImpl)theChannel).getSender(); + assertEquals("maxQueueMemory", maxQueueMemory, + theSender.getMaximumQueueMemory()); + assertEquals("batchSize", batchSize, theSender.getBatchSize()); + assertEquals("batchTimeInterval", batchTimeInterval, + theSender.getBatchTimeInterval()); + assertEquals("isPersistent", isPersistent, theSender.isPersistenceEnabled()); + assertEquals("diskStoreName", diskStoreName, theSender.getDiskStoreName()); + assertEquals("isDiskSynchronous", isDiskSynchronous, + theSender.isDiskSynchronous()); + assertEquals("batchConflation", batchConflationEnabled, + theSender.isBatchConflationEnabled()); + } + + /** + * Validate whether all the attributes set on AsyncEventQueueFactory are set + * on the sender underneath the AsyncEventQueue. + */ + public static void validateConcurrentAsyncEventQueueAttributes(String asyncChannelId, + int maxQueueMemory, int batchSize, int batchTimeInterval, + boolean isPersistent, String diskStoreName, boolean isDiskSynchronous, + boolean batchConflationEnabled, int dispatcherThreads, OrderPolicy policy) { + + AsyncEventQueue theChannel = null; + + Set asyncEventChannels = cache.getAsyncEventQueues(); + for (AsyncEventQueue asyncChannel : asyncEventChannels) { + if (asyncChannelId.equals(asyncChannel.getId())) { + theChannel = asyncChannel; + } + } + + GatewaySender theSender = ((AsyncEventQueueImpl)theChannel).getSender(); + assertEquals("maxQueueMemory", maxQueueMemory, theSender + .getMaximumQueueMemory()); + assertEquals("batchSize", batchSize, theSender.getBatchSize()); + assertEquals("batchTimeInterval", batchTimeInterval, theSender + .getBatchTimeInterval()); + assertEquals("isPersistent", isPersistent, theSender.isPersistenceEnabled()); + assertEquals("diskStoreName", diskStoreName, theSender.getDiskStoreName()); + assertEquals("isDiskSynchronous", isDiskSynchronous, theSender + .isDiskSynchronous()); + assertEquals("batchConflation", batchConflationEnabled, theSender + .isBatchConflationEnabled()); + assertEquals("dispatcherThreads", dispatcherThreads, theSender + .getDispatcherThreads()); + assertEquals("orderPolicy", policy, theSender.getOrderPolicy()); + } + + public static void validateAsyncEventListener(String asyncQueueId, + final int expectedSize) { + AsyncEventListener theListener = null; + + Set asyncEventQueues = cache.getAsyncEventQueues(); + for (AsyncEventQueue asyncQueue : asyncEventQueues) { + if (asyncQueueId.equals(asyncQueue.getId())) { + theListener = asyncQueue.getAsyncEventListener(); + } + } + + final Map eventsMap = ((MyAsyncEventListener)theListener).getEventsMap(); + assertNotNull(eventsMap); + WaitCriterion wc = new WaitCriterion() { + public boolean done() { + if (eventsMap.size() == expectedSize) { + return true; + } + return false; + } + + public String description() { + return "Expected map entries: " + expectedSize + + " but actual entries: " + eventsMap.size(); + } + }; + DistributedTestCase.waitForCriterion(wc, 60000, 500, true); // TODO:Yogs + } + + public static void validateAsyncEventForOperationDetail(String asyncQueueId, + final int expectedSize, boolean isLoad, boolean isPutAll) { + + AsyncEventListener theListener = null; + + Set asyncEventQueues = cache.getAsyncEventQueues(); + for (AsyncEventQueue asyncQueue : asyncEventQueues) { + if (asyncQueueId.equals(asyncQueue.getId())) { + theListener = asyncQueue.getAsyncEventListener(); + } + } + + final Map eventsMap = ((MyAsyncEventListener_CacheLoader)theListener) + .getEventsMap(); + assertNotNull(eventsMap); + WaitCriterion wc = new WaitCriterion() { + public boolean done() { + if (eventsMap.size() == expectedSize) { + return true; + } + return false; + } + + public String description() { + return "Expected map entries: " + expectedSize + + " but actual entries: " + eventsMap.size(); + } + }; + DistributedTestCase.waitForCriterion(wc, 60000, 500, true); // TODO:Yogs + Collection values = eventsMap.values(); + Iterator itr = values.iterator(); + while (itr.hasNext()) { + AsyncEvent asyncEvent = (AsyncEvent)itr.next(); + if (isLoad) + assertTrue(asyncEvent.getOperation().isLoad()); + if (isPutAll) + assertTrue(asyncEvent.getOperation().isPutAll()); + } + } + + public static void validateCustomAsyncEventListener(String asyncQueueId, + final int expectedSize) { + AsyncEventListener theListener = null; + + Set asyncEventQueues = cache.getAsyncEventQueues(); + for (AsyncEventQueue asyncQueue : asyncEventQueues) { + if (asyncQueueId.equals(asyncQueue.getId())) { + theListener = asyncQueue.getAsyncEventListener(); + } + } + + final Map eventsMap = ((CustomAsyncEventListener)theListener) + .getEventsMap(); + assertNotNull(eventsMap); + WaitCriterion wc = new WaitCriterion() { + public boolean done() { + if (eventsMap.size() == expectedSize) { + return true; + } + return false; + } + + public String description() { + return "Expected map entries: " + expectedSize + + " but actual entries: " + eventsMap.size(); + } + }; + DistributedTestCase.waitForCriterion(wc, 60000, 500, true); // TODO:Yogs + + Iterator itr = eventsMap.values().iterator(); + while (itr.hasNext()) { + AsyncEvent event = itr.next(); + assertTrue("possibleDuplicate should be true for event: " + event, + event.getPossibleDuplicate()); + } + } + + public static void waitForAsyncQueueToGetEmpty(String asyncQueueId) { + AsyncEventQueue theAsyncEventQueue = null; + + Set asyncEventChannels = cache.getAsyncEventQueues(); + for (AsyncEventQueue asyncChannel : asyncEventChannels) { + if (asyncQueueId.equals(asyncChannel.getId())) { + theAsyncEventQueue = asyncChannel; + } + } + + final GatewaySender sender = ((AsyncEventQueueImpl)theAsyncEventQueue) + .getSender(); + + if (sender.isParallel()) { + final Set queues = ((AbstractGatewaySender)sender) + .getQueues(); + + WaitCriterion wc = new WaitCriterion() { + public boolean done() { + int size = 0; + for (RegionQueue q : queues) { + size += q.size(); + } + if (size == 0) { + return true; + } + return false; + } + + public String description() { + int size = 0; + for (RegionQueue q : queues) { + size += q.size(); + } + return "Expected queue size to be : " + 0 + " but actual entries: " + + size; + } + }; + DistributedTestCase.waitForCriterion(wc, 60000, 500, true); + + } + else { + WaitCriterion wc = new WaitCriterion() { + public boolean done() { + Set queues = ((AbstractGatewaySender)sender).getQueues(); + int size = 0; + for (RegionQueue q : queues) { + size += q.size(); + } + if (size == 0) { + return true; + } + return false; + } + + public String description() { + Set queues = ((AbstractGatewaySender)sender).getQueues(); + int size = 0; + for (RegionQueue q : queues) { + size += q.size(); + } + return "Expected queue size to be : " + 0 + " but actual entries: " + + size; + } + }; + DistributedTestCase.waitForCriterion(wc, 60000, 500, true); + } + } + + public static void verifyAsyncEventListenerForPossibleDuplicates( + String asyncEventQueueId, Set bucketIds, int batchSize) { + AsyncEventListener theListener = null; + + Set asyncEventQueues = cache.getAsyncEventQueues(); + for (AsyncEventQueue asyncQueue : asyncEventQueues) { + if (asyncEventQueueId.equals(asyncQueue.getId())) { + theListener = asyncQueue.getAsyncEventListener(); + } + } + + final Map> bucketToEventsMap = ((MyAsyncEventListener2)theListener) + .getBucketToEventsMap(); + assertNotNull(bucketToEventsMap); + assertTrue(bucketIds.size() > 1); + + for (int bucketId : bucketIds) { + List eventsForBucket = bucketToEventsMap + .get(bucketId); + getLogWriter().info( + "Events for bucket: " + bucketId + " is " + eventsForBucket); + assertNotNull(eventsForBucket); + for (int i = 0; i < batchSize; i++) { + GatewaySenderEventImpl senderEvent = eventsForBucket.get(i); + assertTrue(senderEvent.getPossibleDuplicate()); + } + } + } + + public static int getAsyncEventListenerMapSize(String asyncEventQueueId) { + AsyncEventListener theListener = null; + + Set asyncEventQueues = cache.getAsyncEventQueues(); + for (AsyncEventQueue asyncQueue : asyncEventQueues) { + if (asyncEventQueueId.equals(asyncQueue.getId())) { + theListener = asyncQueue.getAsyncEventListener(); + } + } + + final Map eventsMap = ((MyAsyncEventListener)theListener).getEventsMap(); + assertNotNull(eventsMap); + getLogWriter().info("The events map size is " + eventsMap.size()); + return eventsMap.size(); + } + + public static int getAsyncEventQueueSize(String asyncEventQueueId) { + AsyncEventQueue theQueue = null; + + Set asyncEventQueues = cache.getAsyncEventQueues(); + for (AsyncEventQueue asyncQueue : asyncEventQueues) { + if (asyncEventQueueId.equals(asyncQueue.getId())) { + theQueue = asyncQueue; + } + } + assertNotNull(theQueue); + return theQueue.size(); + } + + public static String getRegionFullPath(String regionName) { + final Region r = cache.getRegion(Region.SEPARATOR + regionName); + assertNotNull(r); + return r.getFullPath(); + } + + public static Set getAllPrimaryBucketsOnTheNode(String regionName) { + PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName); + return region.getDataStore().getAllLocalPrimaryBucketIds(); + } + + public static void addCacheListenerAndCloseCache(String regionName) { + final Region region = cache.getRegion(Region.SEPARATOR + regionName); + assertNotNull(region); + CacheListenerAdapter cl = new CacheListenerAdapter() { + @Override + public void afterCreate(EntryEvent event) { + if ((Long)event.getKey() == 900) { + cache.getLogger().fine(" Gateway sender is killed by a test"); + cache.close(); + cache.getDistributedSystem().disconnect(); + } + } + }; + region.getAttributesMutator().addCacheListener(cl); + } + + public static Boolean killSender(String senderId) { + final ExpectedException exln = addExpectedException("Could not connect"); + ExpectedException exp = addExpectedException(CacheClosedException.class + .getName()); + ExpectedException exp1 = addExpectedException(ForceReattemptException.class + .getName()); + try { + Set senders = cache.getGatewaySenders(); + AbstractGatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = (AbstractGatewaySender)s; + break; + } + } + if (sender.isPrimary()) { + getLogWriter().info("Gateway sender is killed by a test"); + cache.getDistributedSystem().disconnect(); + return Boolean.TRUE; + } + return Boolean.FALSE; + } + finally { + exp.remove(); + exp1.remove(); + exln.remove(); + } + } + + public static Boolean killAsyncEventQueue(String asyncQueueId) { + Set queues = cache.getAsyncEventQueues(); + AsyncEventQueueImpl queue = null; + for (AsyncEventQueue q : queues) { + if (q.getId().equals(asyncQueueId)) { + queue = (AsyncEventQueueImpl)q; + break; + } + } + if (queue.isPrimary()) { + getLogWriter().info("AsyncEventQueue is killed by a test"); + cache.getDistributedSystem().disconnect(); + return Boolean.TRUE; + } + return Boolean.FALSE; + } + + public static void killSender() { + getLogWriter().info("Gateway sender is going to be killed by a test"); + cache.close(); + cache.getDistributedSystem().disconnect(); + getLogWriter().info("Gateway sender is killed by a test"); + } + + public static class MyLocatorCallback extends LocatorDiscoveryCallbackAdapter { + + private final Set discoveredLocators = new HashSet(); + + private final Set removedLocators = new HashSet(); + + public synchronized void locatorsDiscovered(List locators) { + discoveredLocators.addAll(locators); + notifyAll(); + } + + public synchronized void locatorsRemoved(List locators) { + removedLocators.addAll(locators); + notifyAll(); + } + + public boolean waitForDiscovery(InetSocketAddress locator, long time) + throws InterruptedException { + return waitFor(discoveredLocators, locator, time); + } + + public boolean waitForRemove(InetSocketAddress locator, long time) + throws InterruptedException { + return waitFor(removedLocators, locator, time); + } + + private synchronized boolean waitFor(Set set, InetSocketAddress locator, + long time) throws InterruptedException { + long remaining = time; + long endTime = System.currentTimeMillis() + time; + while (!set.contains(locator) && remaining >= 0) { + wait(remaining); + remaining = endTime - System.currentTimeMillis(); + } + return set.contains(locator); + } + + public synchronized Set getDiscovered() { + return new HashSet(discoveredLocators); + } + + public synchronized Set getRemoved() { + return new HashSet(removedLocators); + } + } + + public void tearDown2() throws Exception { + super.tearDown2(); + cleanupVM(); + vm0.invoke(AsyncEventQueueTestBase.class, "cleanupVM"); + vm1.invoke(AsyncEventQueueTestBase.class, "cleanupVM"); + vm2.invoke(AsyncEventQueueTestBase.class, "cleanupVM"); + vm3.invoke(AsyncEventQueueTestBase.class, "cleanupVM"); + vm4.invoke(AsyncEventQueueTestBase.class, "cleanupVM"); + vm5.invoke(AsyncEventQueueTestBase.class, "cleanupVM"); + vm6.invoke(AsyncEventQueueTestBase.class, "cleanupVM"); + vm7.invoke(AsyncEventQueueTestBase.class, "cleanupVM"); + } + + public static void cleanupVM() throws IOException { + closeCache(); + } + + public static void closeCache() throws IOException { + if (cache != null && !cache.isClosed()) { + cache.close(); + cache.getDistributedSystem().disconnect(); + cache = null; + } + else { + AsyncEventQueueTestBase test = new AsyncEventQueueTestBase(testName); + if (test.isConnectedToDS()) { + test.getSystem().disconnect(); + } + } + } + + public static void shutdownLocator() { + AsyncEventQueueTestBase test = new AsyncEventQueueTestBase(testName); + test.getSystem().disconnect(); + } + + public static void printEventListenerMap() { + ((MyGatewaySenderEventListener)eventListener1).printMap(); + } + + + @Override + public InternalDistributedSystem getSystem(Properties props) { + // For now all WANTestBase tests allocate off-heap memory even though + // many of them never use it. + // The problem is that WANTestBase has static methods that create instances + // of WANTestBase (instead of instances of the subclass). So we can't override + // this method so that only the off-heap subclasses allocate off heap memory. + props.setProperty(DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME, "300m"); + return super.getSystem(props); + } + + /** + * Returns true if the test should create off-heap regions. + * OffHeap tests should over-ride this method and return false. + */ + public boolean isOffHeap() { + return false; + } + +} + +class MyAsyncEventListener_CacheLoader implements AsyncEventListener { + private final Map eventsMap; + + public MyAsyncEventListener_CacheLoader() { + this.eventsMap = new ConcurrentHashMap(); + } + + public boolean processEvents(List events) { + for (AsyncEvent event : events) { + this.eventsMap.put(event.getKey(), event); + } + return true; + } + + public Map getEventsMap() { + return eventsMap; + } + + public void close() { + } +} + +class MyCacheLoader implements CacheLoader, Declarable { + + public Object load(LoaderHelper helper) { + Long key = (Long)helper.getKey(); + return "LoadedValue" + "_" + key; + } + + public void close() { + } + + public void init(Properties props) { + } + +}