Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3C9C819E6E for ; Fri, 1 Apr 2016 17:32:25 +0000 (UTC) Received: (qmail 46426 invoked by uid 500); 1 Apr 2016 17:32:25 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 46397 invoked by uid 500); 1 Apr 2016 17:32:25 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 46383 invoked by uid 99); 1 Apr 2016 17:32:24 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Apr 2016 17:32:24 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 5E2D0C0CF3 for ; Fri, 1 Apr 2016 17:32:24 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.021 X-Spam-Level: X-Spam-Status: No, score=-4.021 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id KDC4EYr1zjyY for ; Fri, 1 Apr 2016 17:32:12 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id A43325FDFB for ; Fri, 1 Apr 2016 17:32:10 +0000 (UTC) Received: (qmail 45402 invoked by uid 99); 1 Apr 2016 17:32:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Apr 2016 17:32:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 79EE2E04EB; Fri, 1 Apr 2016 17:32:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jasonhuynh@apache.org To: commits@geode.incubator.apache.org Date: Fri, 01 Apr 2016 17:32:14 -0000 Message-Id: <6505074e3c704bba93a08d0f0b3d0cb0@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [6/7] incubator-geode git commit: GEODE-1062: Refactor of WANTestBase http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java index d73084b..6685451 100644 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; @@ -132,15 +133,15 @@ public class WANTestBase extends DistributedTestCase{ protected static Cache cache; protected static Region region; - + protected static PartitionedRegion customerRegion; protected static PartitionedRegion orderRegion; protected static PartitionedRegion shipmentRegion; - + protected static final String customerRegionName = "CUSTOMER"; protected static final String orderRegionName = "ORDER"; protected static final String shipmentRegionName = "SHIPMENT"; - + protected static VM vm0; protected static VM vm1; protected static VM vm2; @@ -149,26 +150,26 @@ public class WANTestBase extends DistributedTestCase{ protected static VM vm5; protected static VM vm6; protected static VM vm7; - + protected static QueueListener listener1; protected static QueueListener listener2; - + protected static List gatewayListeners; - + protected static AsyncEventListener eventListener1 ; protected static AsyncEventListener eventListener2 ; private static final long MAX_WAIT = 10000; - + protected static GatewayEventFilter eventFilter; - + protected static boolean destroyFlag = false; - - protected static List dispatcherThreads = + + protected static List dispatcherThreads = new ArrayList(Arrays.asList(1, 3, 5)); //this will be set for each test method run with one of the values from above list protected static int numDispatcherThreadsForTheRun = 1; - + public WANTestBase(String name) { super(name); } @@ -201,15 +202,15 @@ public class WANTestBase extends DistributedTestCase{ protected void postSetUpWANTestBase() throws Exception { } - + public static void shuffleNumDispatcherThreads() { - Collections.shuffle(dispatcherThreads); + Collections.shuffle(dispatcherThreads); } - + public static void setNumDispatcherThreadsForTheRun(int numThreads) { numDispatcherThreadsForTheRun = numThreads; } - + public static void stopOldLocator() { if (Locator.hasLocator()) { Locator.getLocator().stop(); @@ -225,19 +226,19 @@ public class WANTestBase extends DistributedTestCase{ localLocatorBuffer.deleteCharAt(0); localLocatorBuffer.deleteCharAt(localLocatorBuffer.lastIndexOf("]")); String localLocator = localLocatorBuffer.toString(); - localLocator = localLocator.replace(" ", ""); - + localLocator = localLocator.replace(" ", ""); + props.setProperty(DistributionConfig.LOCATORS_NAME, localLocator); props.setProperty(DistributionConfig.START_LOCATOR_NAME, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost"); StringBuffer remoteLocatorBuffer = new StringBuffer(remoteLocatorsList.toString()); remoteLocatorBuffer.deleteCharAt(0); remoteLocatorBuffer.deleteCharAt(remoteLocatorBuffer.lastIndexOf("]")); String remoteLocator = remoteLocatorBuffer.toString(); - remoteLocator = remoteLocator.replace(" ", ""); + remoteLocator = remoteLocator.replace(" ", ""); props.setProperty(DistributionConfig.REMOTE_LOCATORS_NAME, remoteLocator); test.getSystem(props); } - + public static Integer createFirstLocatorWithDSId(int dsId) { stopOldLocator(); WANTestBase test = new WANTestBase(getTestMethodName()); @@ -250,7 +251,7 @@ public class WANTestBase extends DistributedTestCase{ test.getSystem(props); return port; } - + public static Integer createFirstPeerLocator(int dsId) { stopOldLocator(); WANTestBase test = new WANTestBase(getTestMethodName()); @@ -263,7 +264,7 @@ public class WANTestBase extends DistributedTestCase{ test.getSystem(props); return port; } - + public static Integer createSecondLocator(int dsId, int locatorPort) { stopOldLocator(); WANTestBase test = new WANTestBase(getTestMethodName()); @@ -289,7 +290,7 @@ public class WANTestBase extends DistributedTestCase{ test.getSystem(props); return port; } - + public static Integer createFirstRemoteLocator(int dsId, int remoteLocPort) { stopOldLocator(); WANTestBase test = new WANTestBase(getTestMethodName()); @@ -303,7 +304,7 @@ public class WANTestBase extends DistributedTestCase{ test.getSystem(props); return port; } - + public static void bringBackLocatorOnOldPort(int dsId, int remoteLocPort, int oldPort) { WANTestBase test = new WANTestBase(getTestMethodName()); Properties props = test.getDistributedSystemProperties(); @@ -316,8 +317,8 @@ public class WANTestBase extends DistributedTestCase{ test.getSystem(props); return; } - - + + public static Integer createFirstRemotePeerLocator(int dsId, int remoteLocPort) { stopOldLocator(); WANTestBase test = new WANTestBase(getTestMethodName()); @@ -331,7 +332,7 @@ public class WANTestBase extends DistributedTestCase{ test.getSystem(props); return port; } - + public static Integer createSecondRemoteLocator(int dsId, int localPort, int remoteLocPort) { stopOldLocator(); @@ -346,7 +347,7 @@ public class WANTestBase extends DistributedTestCase{ test.getSystem(props); return port; } - + public static Integer createSecondRemotePeerLocator(int dsId, int localPort, int remoteLocPort) { stopOldLocator(); @@ -361,7 +362,7 @@ public class WANTestBase extends DistributedTestCase{ test.getSystem(props); return port; } - + public static void createReplicatedRegion(String regionName, String senderIds, Boolean offHeap){ IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class .getName()); @@ -409,7 +410,7 @@ public class WANTestBase extends DistributedTestCase{ Region r = cache.createRegionFactory(fact.create()).create(regionName); assertNotNull(r); } - + // public static void createReplicatedRegion_PDX(String regionName, String senderId, DataPolicy policy, InterestPolicy intPolicy){ // AttributesFactory fact = new AttributesFactory(); // if(senderId!= null){ @@ -427,7 +428,7 @@ public class WANTestBase extends DistributedTestCase{ // assertNotNull(r); // assertTrue(r.size() == 0); // } - + public static void createPersistentReplicatedRegion(String regionName, String senderIds, Boolean offHeap){ AttributesFactory fact = new AttributesFactory(); if(senderIds!= null){ @@ -444,7 +445,7 @@ public class WANTestBase extends DistributedTestCase{ Region r = cache.createRegionFactory(fact.create()).create(regionName); assertNotNull(r); } - + // public static void createReplicatedRegionWithParallelSenderId(String regionName, String senderId){ // AttributesFactory fact = new AttributesFactory(); // if(senderId!= null){ @@ -458,14 +459,14 @@ public class WANTestBase extends DistributedTestCase{ // Region r = cache.createRegionFactory(fact.create()).create(regionName); // assertNotNull(r); // } - + // public static void createReplicatedRegion(String regionName){ // AttributesFactory fact = new AttributesFactory(); // fact.setDataPolicy(DataPolicy.REPLICATE); // Region r = cache.createRegionFactory(fact.create()).create(regionName); // assertNotNull(r); // } - + public static void createReplicatedRegionWithAsyncEventQueue( String regionName, String asyncQueueIds, Boolean offHeap) { IgnoredException exp1 = IgnoredException.addIgnoredException(ForceReattemptException.class @@ -489,10 +490,10 @@ public class WANTestBase extends DistributedTestCase{ exp1.remove(); } } - + public static void createPersistentReplicatedRegionWithAsyncEventQueue( String regionName, String asyncQueueIds) { - + AttributesFactory fact = new AttributesFactory(); if(asyncQueueIds != null){ StringTokenizer tokenizer = new StringTokenizer(asyncQueueIds, ","); @@ -506,9 +507,9 @@ public class WANTestBase extends DistributedTestCase{ Region r = regionFactory.create(regionName); assertNotNull(r); } - - - + + + public static void createReplicatedRegionWithSenderAndAsyncEventQueue( String regionName, String senderIds, String asyncChannelId, Boolean offHeap) { IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class @@ -535,7 +536,7 @@ public class WANTestBase extends DistributedTestCase{ exp.remove(); } } - + public static void createReplicatedRegion(String regionName, String senderIds, Scope scope, DataPolicy policy, Boolean offHeap){ AttributesFactory fact = new AttributesFactory(); if(senderIds!= null){ @@ -553,12 +554,12 @@ public class WANTestBase extends DistributedTestCase{ Region r = cache.createRegionFactory(fact.create()).create(regionName); assertNotNull(r); } - + public static void createAsyncEventQueue( - String asyncChannelId, boolean isParallel, - Integer maxMemory, Integer batchSize, boolean isConflation, + String asyncChannelId, boolean isParallel, + Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, String diskStoreName, boolean isDiskSynchronous) { - + if (diskStoreName != null) { File directory = new File(asyncChannelId + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); @@ -568,9 +569,9 @@ public class WANTestBase extends DistributedTestCase{ dsf.setDiskDirs(dirs1); DiskStore ds = dsf.create(diskStoreName); } - + AsyncEventListener asyncEventListener = new MyAsyncEventListener(); - + AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory(); factory.setBatchSize(batchSize); factory.setPersistent(isPersistent); @@ -583,11 +584,11 @@ public class WANTestBase extends DistributedTestCase{ factory.setDispatcherThreads(numDispatcherThreadsForTheRun); AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener); } - + public static void createAsyncEventQueueWithListener2(String asyncChannelId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isPersistent, String diskStoreName) { - + if (diskStoreName != null) { File directory = new File(asyncChannelId + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); @@ -597,7 +598,7 @@ public class WANTestBase extends DistributedTestCase{ dsf.setDiskDirs(dirs1); DiskStore ds = dsf.create(diskStoreName); } - + AsyncEventListener asyncEventListener = new MyAsyncEventListener2(); AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory(); @@ -611,12 +612,12 @@ public class WANTestBase extends DistributedTestCase{ AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener); } - + public static void createAsyncEventQueue( - String asyncChannelId, boolean isParallel, Integer maxMemory, - Integer batchSize, boolean isConflation, boolean isPersistent, + String asyncChannelId, boolean isParallel, Integer maxMemory, + Integer batchSize, boolean isConflation, boolean isPersistent, String diskStoreName, boolean isDiskSynchronous, String asyncListenerClass) throws Exception { - + if (diskStoreName != null) { File directory = new File(asyncChannelId + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); @@ -626,7 +627,7 @@ public class WANTestBase extends DistributedTestCase{ dsf.setDiskDirs(dirs1); DiskStore ds = dsf.create(diskStoreName); } - + String packagePrefix = "com.gemstone.gemfire.internal.cache.wan."; String className = packagePrefix + asyncListenerClass; AsyncEventListener asyncEventListener = null; @@ -640,7 +641,7 @@ public class WANTestBase extends DistributedTestCase{ } catch (IllegalAccessException e) { throw e; } - + AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory(); factory.setBatchSize(batchSize); factory.setPersistent(isPersistent); @@ -653,7 +654,7 @@ public class WANTestBase extends DistributedTestCase{ factory.setDispatcherThreads(numDispatcherThreadsForTheRun); AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener); } - + public static void createAsyncEventQueueWithCustomListener( String asyncChannelId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, @@ -661,7 +662,7 @@ public class WANTestBase extends DistributedTestCase{ createAsyncEventQueueWithCustomListener(asyncChannelId, isParallel, maxMemory, batchSize, isConflation, isPersistent, diskStoreName, isDiskSynchronous, GatewaySender.DEFAULT_DISPATCHER_THREADS); } - + public static void createAsyncEventQueueWithCustomListener( String asyncChannelId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, @@ -698,11 +699,11 @@ public class WANTestBase extends DistributedTestCase{ } public static void createConcurrentAsyncEventQueue( - String asyncChannelId, boolean isParallel, - Integer maxMemory, Integer batchSize, boolean isConflation, + String asyncChannelId, boolean isParallel, + Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, String diskStoreName, boolean isDiskSynchronous, int dispatcherThreads, OrderPolicy policy) { - + if (diskStoreName != null) { File directory = new File(asyncChannelId + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); @@ -712,9 +713,9 @@ public class WANTestBase extends DistributedTestCase{ dsf.setDiskDirs(dirs1); DiskStore ds = dsf.create(diskStoreName); } - + AsyncEventListener asyncEventListener = new MyAsyncEventListener(); - + AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory(); factory.setBatchSize(batchSize); factory.setPersistent(isPersistent); @@ -727,27 +728,27 @@ public class WANTestBase extends DistributedTestCase{ factory.setOrderPolicy(policy); AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener); } - - + + public static String createAsyncEventQueueWithDiskStore( - String asyncChannelId, boolean isParallel, - Integer maxMemory, Integer batchSize, + String asyncChannelId, boolean isParallel, + Integer maxMemory, Integer batchSize, boolean isPersistent, String diskStoreName) { - + AsyncEventListener asyncEventListener = new MyAsyncEventListener(); - + File persistentDirectory = null; if (diskStoreName == null) { persistentDirectory = new File(asyncChannelId + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); } else { - persistentDirectory = new File(diskStoreName); + persistentDirectory = new File(diskStoreName); } LogWriterUtils.getLogWriter().info("The ds is : " + persistentDirectory.getName()); persistentDirectory.mkdir(); DiskStoreFactory dsf = cache.createDiskStoreFactory(); File [] dirs1 = new File[] {persistentDirectory}; - + AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory(); factory.setBatchSize(batchSize); factory.setParallel(isParallel); @@ -761,23 +762,23 @@ public class WANTestBase extends DistributedTestCase{ AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener); return persistentDirectory.getName(); } - + public static void pauseAsyncEventQueue(String asyncChannelId) { AsyncEventQueue theChannel = null; - + Set asyncEventChannels = cache.getAsyncEventQueues(); for (AsyncEventQueue asyncChannel : asyncEventChannels) { if (asyncChannelId.equals(asyncChannel.getId())) { theChannel = asyncChannel; } } - + ((AsyncEventQueueImpl)theChannel).getSender().pause(); } - + public static void pauseAsyncEventQueueAndWaitForDispatcherToPause(String asyncChannelId) { AsyncEventQueue theChannel = null; - + Set asyncEventChannels = cache.getAsyncEventQueues(); for (AsyncEventQueue asyncChannel : asyncEventChannels) { if (asyncChannelId.equals(asyncChannel.getId())) { @@ -785,39 +786,39 @@ public class WANTestBase extends DistributedTestCase{ break; } } - + ((AsyncEventQueueImpl)theChannel).getSender().pause(); - - + + ((AbstractGatewaySender)((AsyncEventQueueImpl)theChannel).getSender()).getEventProcessor().waitForDispatcherToPause(); } - + public static void resumeAsyncEventQueue(String asyncQueueId) { AsyncEventQueue theQueue = null; - + Set asyncEventChannels = cache.getAsyncEventQueues(); for (AsyncEventQueue asyncChannel : asyncEventChannels) { if (asyncQueueId.equals(asyncChannel.getId())) { theQueue = asyncChannel; } } - + ((AsyncEventQueueImpl)theQueue).getSender().resume(); } - - + + public static void checkAsyncEventQueueSize(String asyncQueueId, int numQueueEntries) { AsyncEventQueue theAsyncEventQueue = null; - + Set asyncEventChannels = cache.getAsyncEventQueues(); for (AsyncEventQueue asyncChannel : asyncEventChannels) { if (asyncQueueId.equals(asyncChannel.getId())) { theAsyncEventQueue = asyncChannel; } } - + GatewaySender sender = ((AsyncEventQueueImpl)theAsyncEventQueue).getSender(); - + if (sender.isParallel()) { Set queues = ((AbstractGatewaySender)sender).getQueues(); assertEquals(numQueueEntries, @@ -831,12 +832,12 @@ public class WANTestBase extends DistributedTestCase{ assertEquals(numQueueEntries, size); } } - + /** * This method verifies the queue size of a ParallelGatewaySender. For * ParallelGatewaySender conflation happens in a separate thread, hence test * code needs to wait for some time for expected result - * + * * @param asyncQueueId * Async Queue ID * @param numQueueEntries @@ -881,7 +882,7 @@ public class WANTestBase extends DistributedTestCase{ } } - + public static void createPartitionedRegion(String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap){ IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class .getName()); @@ -912,7 +913,7 @@ public class WANTestBase extends DistributedTestCase{ exp1.remove(); } } - + // TODO:OFFHEAP: add offheap flavor public static void createPartitionedRegionWithPersistence(String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets) { @@ -976,7 +977,7 @@ public class WANTestBase extends DistributedTestCase{ exp1.remove(); } } - + public static void addSenderThroughAttributesMutator(String regionName, String senderIds){ final Region r = cache.getRegion(Region.SEPARATOR + regionName); @@ -984,7 +985,7 @@ public class WANTestBase extends DistributedTestCase{ AttributesMutator mutator = r.getAttributesMutator(); mutator.addGatewaySenderId(senderIds); } - + public static void addAsyncEventQueueThroughAttributesMutator( String regionName, String queueId) { final Region r = cache.getRegion(Region.SEPARATOR + regionName); @@ -992,7 +993,7 @@ public class WANTestBase extends DistributedTestCase{ AttributesMutator mutator = r.getAttributesMutator(); mutator.addAsyncEventQueueId(queueId); } - + public static void createPartitionedRegionWithAsyncEventQueue( String regionName, String asyncEventQueueId, Boolean offHeap) { IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class @@ -1015,10 +1016,10 @@ public class WANTestBase extends DistributedTestCase{ exp1.remove(); } } - + public static void createColocatedPartitionedRegionWithAsyncEventQueue( String regionName, String asyncEventQueueId, Integer totalNumBuckets, String colocatedWith) { - + IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class .getName()); IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class @@ -1039,7 +1040,7 @@ public class WANTestBase extends DistributedTestCase{ exp1.remove(); } } - + public static void createPersistentPartitionedRegionWithAsyncEventQueue( String regionName, String asyncEventQueueId) { AttributesFactory fact = new AttributesFactory(); @@ -1058,7 +1059,7 @@ public class WANTestBase extends DistributedTestCase{ Region r = cache.createRegionFactory(fact.create()).create(regionName); assertNotNull(r); } - + /** * Create PartitionedRegion with 1 redundant copy */ @@ -1083,7 +1084,7 @@ public class WANTestBase extends DistributedTestCase{ exp.remove(); } } - + public static void createPartitionedRegionAccessorWithAsyncEventQueue( String regionName, String asyncEventQueueId) { AttributesFactory fact = new AttributesFactory(); @@ -1097,7 +1098,7 @@ public class WANTestBase extends DistributedTestCase{ //fact.create()).create(regionName); assertNotNull(r); } - + public static void createPartitionedRegionAsAccessor( String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets){ AttributesFactory fact = new AttributesFactory(); @@ -1118,7 +1119,7 @@ public class WANTestBase extends DistributedTestCase{ Region r = cache.createRegionFactory(fact.create()).create(regionName); assertNotNull(r); } - + public static void createPartitionedRegionWithSerialParallelSenderIds(String regionName, String serialSenderIds, String parallelSenderIds, String colocatedWith, Boolean offHeap){ AttributesFactory fact = new AttributesFactory(); if (serialSenderIds != null) { @@ -1146,14 +1147,14 @@ public class WANTestBase extends DistributedTestCase{ Region r = cache.createRegionFactory(fact.create()).create(regionName); assertNotNull(r); } - + public static void createPersistentPartitionedRegion( - String regionName, - String senderIds, - Integer redundantCopies, + String regionName, + String senderIds, + Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap){ - + IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class .getName()); IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class @@ -1184,7 +1185,7 @@ public class WANTestBase extends DistributedTestCase{ exp1.remove(); } } - + public static void createCustomerOrderShipmentPartitionedRegion( String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap) { @@ -1272,7 +1273,7 @@ public class WANTestBase extends DistributedTestCase{ exp.remove(); } } - + public static void createColocatedPartitionedRegions(String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap){ AttributesFactory fact = new AttributesFactory(); if(senderIds!= null){ @@ -1291,17 +1292,17 @@ public class WANTestBase extends DistributedTestCase{ fact.setOffHeap(offHeap); Region r = cache.createRegionFactory(fact.create()).create(regionName); assertNotNull(r); - + pfact.setColocatedWith(r.getName()); fact.setPartitionAttributes(pfact.create()); fact.setOffHeap(offHeap); Region r1 = cache.createRegionFactory(fact.create()).create(regionName+"_child1"); - assertNotNull(r1); - + assertNotNull(r1); + Region r2 = cache.createRegionFactory(fact.create()).create(regionName+"_child2"); assertNotNull(r2); } - + public static void createColocatedPartitionedRegions2 (String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap){ AttributesFactory fact = new AttributesFactory(); if(senderIds!= null){ @@ -1320,36 +1321,57 @@ public class WANTestBase extends DistributedTestCase{ fact.setOffHeap(offHeap); Region r = cache.createRegionFactory(fact.create()).create(regionName); assertNotNull(r); - - + + fact = new AttributesFactory(); pfact.setColocatedWith(r.getName()); fact.setPartitionAttributes(pfact.create()); fact.setOffHeap(offHeap); Region r1 = cache.createRegionFactory(fact.create()).create(regionName+"_child1"); - assertNotNull(r1); - + assertNotNull(r1); + Region r2 = cache.createRegionFactory(fact.create()).create(regionName+"_child2"); assertNotNull(r2); } - + + public static void createCacheInVMs(Integer locatorPort, VM... vms) { + for (VM vm : vms) { + vm.invoke(() -> createCache(locatorPort)); + } + } + + public static void createCacheInVMsAsync(Integer locatorPort, VM... vms) { + List tasks = new LinkedList<>(); + for (VM vm : vms) { + tasks.add(vm.invokeAsync(() -> createCache(locatorPort))); + } + for (AsyncInvocation invocation : tasks) { + try { + invocation.join(60000); + } + catch (InterruptedException e) { + fail("Failed starting up the cache"); + } + } + } + public static void createCache(Integer locPort){ createCache(false, locPort); } public static void createManagementCache(Integer locPort){ createCache(true, locPort); } - + public static void createCacheConserveSockets(Boolean conserveSockets,Integer locPort){ WANTestBase test = new WANTestBase(getTestMethodName()); Properties props = test.getDistributedSystemProperties(); props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort + "]"); - props.setProperty(DistributionConfig.CONSERVE_SOCKETS_NAME, conserveSockets.toString()); + props.setProperty(DistributionConfig.CONSERVE_SOCKETS_NAME, conserveSockets.toString()); InternalDistributedSystem ds = test.getSystem(props); cache = CacheFactory.create(ds); } - + protected static void createCache(boolean management, Integer locPort) { WANTestBase test = new WANTestBase(getTestMethodName()); Properties props = test.getDistributedSystemProperties(); @@ -1362,9 +1384,9 @@ public class WANTestBase extends DistributedTestCase{ props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort + "]"); InternalDistributedSystem ds = test.getSystem(props); - cache = CacheFactory.create(ds); + cache = CacheFactory.create(ds); } - + protected static void createCacheWithSSL(Integer locPort) { WANTestBase test = new WANTestBase(getTestMethodName()); @@ -1372,31 +1394,31 @@ public class WANTestBase extends DistributedTestCase{ String gatewaySslprotocols = "any"; String gatewaySslciphers = "any"; boolean gatewaySslRequireAuth = true; - + Properties gemFireProps = test.getDistributedSystemProperties(); gemFireProps.put(DistributionConfig.LOG_LEVEL_NAME, LogWriterUtils.getDUnitLogLevel()); gemFireProps.put(DistributionConfig.GATEWAY_SSL_ENABLED_NAME, String.valueOf(gatewaySslenabled)); gemFireProps.put(DistributionConfig.GATEWAY_SSL_PROTOCOLS_NAME, gatewaySslprotocols); gemFireProps.put(DistributionConfig.GATEWAY_SSL_CIPHERS_NAME, gatewaySslciphers); gemFireProps.put(DistributionConfig.GATEWAY_SSL_REQUIRE_AUTHENTICATION_NAME, String.valueOf(gatewaySslRequireAuth)); - + gemFireProps.put(DistributionConfig.GATEWAY_SSL_KEYSTORE_TYPE_NAME, "jks"); - gemFireProps.put(DistributionConfig.GATEWAY_SSL_KEYSTORE_NAME, + gemFireProps.put(DistributionConfig.GATEWAY_SSL_KEYSTORE_NAME, TestUtil.getResourcePath(WANTestBase.class, "/com/gemstone/gemfire/cache/client/internal/client.keystore")); gemFireProps.put(DistributionConfig.GATEWAY_SSL_KEYSTORE_PASSWORD_NAME, "password"); - gemFireProps.put(DistributionConfig.GATEWAY_SSL_TRUSTSTORE_NAME, + gemFireProps.put(DistributionConfig.GATEWAY_SSL_TRUSTSTORE_NAME, TestUtil.getResourcePath(WANTestBase.class, "/com/gemstone/gemfire/cache/client/internal/client.truststore")); gemFireProps.put(DistributionConfig.GATEWAY_SSL_TRUSTSTORE_PASSWORD_NAME, "password"); - + gemFireProps.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); gemFireProps.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort + "]"); - + LogWriterUtils.getLogWriter().info("Starting cache ds with following properties \n" + gemFireProps); - + InternalDistributedSystem ds = test.getSystem(gemFireProps); - cache = CacheFactory.create(ds); + cache = CacheFactory.create(ds); } - + public static void createCache_PDX(Integer locPort){ WANTestBase test = new WANTestBase(getTestMethodName()); Properties props = test.getDistributedSystemProperties(); @@ -1407,13 +1429,13 @@ public class WANTestBase extends DistributedTestCase{ cacheConfig.setPdxPersistent(true); cacheConfig.setPdxDiskStore("PDX_TEST"); cache = GemFireCacheImpl.create(ds, false, cacheConfig); - + File pdxDir = new File(CacheTestCase.getDiskDir(), "pdx"); DiskStoreFactory dsf = cache.createDiskStoreFactory(); File [] dirs1 = new File[] {pdxDir}; DiskStore store = dsf.setDiskDirs(dirs1).setMaxOplogSize(1).create("PDX_TEST"); } - + public static void createCache(Integer locPort1, Integer locPort2){ WANTestBase test = new WANTestBase(getTestMethodName()); Properties props = test.getDistributedSystemProperties(); @@ -1423,7 +1445,7 @@ public class WANTestBase extends DistributedTestCase{ InternalDistributedSystem ds = test.getSystem(props); cache = CacheFactory.create(ds); } - + public static void createCacheWithoutLocator(Integer mCastPort){ WANTestBase test = new WANTestBase(getTestMethodName()); Properties props = test.getDistributedSystemProperties(); @@ -1431,10 +1453,10 @@ public class WANTestBase extends DistributedTestCase{ InternalDistributedSystem ds = test.getSystem(props); cache = CacheFactory.create(ds); } - + /** * Method that creates a bridge server - * + * * @return Integer Port on which the server is started. */ public static Integer createCacheServer() { @@ -1452,16 +1474,16 @@ public class WANTestBase extends DistributedTestCase{ return new Integer(server1.getPort()); } - + /** * Returns a Map that contains the count for number of bridge server and number * of Receivers. - * + * * @return Map */ public static Map getCacheServers() { List cacheServers = cache.getCacheServers(); - + Map cacheServersMap = new HashMap(); Iterator itr = cacheServers.iterator(); int bridgeServerCounter = 0; @@ -1478,7 +1500,29 @@ public class WANTestBase extends DistributedTestCase{ cacheServersMap.put("ReceiverServer", receiverServerCounter); return cacheServersMap; } - + + public static void startSenderInVMs(String senderId, VM... vms) { + for (VM vm : vms) { + vm.invoke(() -> startSender(senderId)); + } + } + + public static void startSenderInVMsAsync(String senderId, VM... vms) { + List tasks = new LinkedList<>(); + for (VM vm : vms) { + tasks.add(vm.invokeAsync(() -> startSender(senderId))); + } + for (AsyncInvocation invocation : tasks) { + try { + invocation.join(30000); + } + catch (InterruptedException e) { + fail("Starting senders was interrupted"); + } + } + } + + public static void startSender(String senderId) { final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); @@ -1503,7 +1547,7 @@ public class WANTestBase extends DistributedTestCase{ } } - + public static void enableConflation(String senderId) { Set senders = cache.getGatewaySenders(); AbstractGatewaySender sender = null; @@ -1515,7 +1559,7 @@ public class WANTestBase extends DistributedTestCase{ } sender.test_setBatchConflationEnabled(true); } - + public static void startAsyncEventQueue(String senderId) { Set queues = cache.getAsyncEventQueues(); AsyncEventQueue q = null; @@ -1528,8 +1572,8 @@ public class WANTestBase extends DistributedTestCase{ //merge42180: There is no start method on AsyncEventQueue. Cheetah has this method. Yet the code for AsyncEvnt Queue is not properly merged from cheetah to cedar //q.start(); } - - public static Map getSenderToReceiverConnectionInfo(String senderId){ + + public static Map getSenderToReceiverConnectionInfo(String senderId){ Set senders = cache.getGatewaySenders(); GatewaySender sender = null; for(GatewaySender s : senders){ @@ -1541,14 +1585,14 @@ public class WANTestBase extends DistributedTestCase{ Map connectionInfo = null; if (!sender.isParallel() && ((AbstractGatewaySender) sender).isPrimary()) { connectionInfo = new HashMap(); - GatewaySenderEventDispatcher dispatcher = + GatewaySenderEventDispatcher dispatcher = ((AbstractGatewaySender)sender).getEventProcessor().getDispatcher(); if (dispatcher instanceof GatewaySenderEventRemoteDispatcher) { - ServerLocation serverLocation = + ServerLocation serverLocation = ((GatewaySenderEventRemoteDispatcher) dispatcher).getConnection(false).getServer(); connectionInfo.put("serverHost", serverLocation.getHostName()); connectionInfo.put("serverPort", serverLocation.getPort()); - + } } return connectionInfo; @@ -1594,7 +1638,7 @@ public class WANTestBase extends DistributedTestCase{ stats.add(statistics.getEventsConflatedFromBatches()); return stats; } - + public static void checkQueueStats(String senderId, final int queueSize, final int eventsReceived, final int eventsQueued, final int eventsDistributed) { @@ -1606,7 +1650,7 @@ public class WANTestBase extends DistributedTestCase{ break; } } - + final GatewaySenderStats statistics = ((AbstractGatewaySender)sender).getStatistics(); assertEquals(queueSize, statistics.getEventQueueSize()); assertEquals(eventsReceived, statistics.getEventsReceived()); @@ -1626,12 +1670,12 @@ public class WANTestBase extends DistributedTestCase{ } } final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue).getStatistics(); - assertEquals(queueSize, statistics.getEventQueueSize()); + assertEquals(queueSize, statistics.getEventQueueSize()); assertEquals(eventsReceived, statistics.getEventsReceived()); assertEquals(eventsQueued, statistics.getEventsQueued()); assert(statistics.getEventsDistributed() >= eventsDistributed); } - + public static void checkGatewayReceiverStats(int processBatches, int eventsReceived, int creates) { Set gatewayReceivers = cache.getGatewayReceivers(); @@ -1658,7 +1702,7 @@ public class WANTestBase extends DistributedTestCase{ assertTrue(gatewayReceiverStats.getProcessBatchRequests() >= processBatches); assertTrue(gatewayReceiverStats.getEventsReceived()>= eventsReceived); } - + public static void checkExcepitonStats(int exceptionsOccured) { Set gatewayReceivers = cache.getGatewayReceivers(); GatewayReceiver receiver = (GatewayReceiver)gatewayReceivers.iterator().next(); @@ -1689,7 +1733,7 @@ public class WANTestBase extends DistributedTestCase{ assertTrue(gatewayReceiverStats.getEventsReceived() >= eventsReceived); assertTrue(gatewayReceiverStats.getCreateRequest() >= creates); } - + public static void checkEventFilteredStats(String senderId, final int eventsFiltered) { Set senders = cache.getGatewaySenders(); GatewaySender sender = null; @@ -1702,7 +1746,7 @@ public class WANTestBase extends DistributedTestCase{ final GatewaySenderStats statistics = ((AbstractGatewaySender)sender).getStatistics(); assertEquals(eventsFiltered, statistics.getEventsFiltered()); } - + public static void checkConflatedStats(String senderId, final int eventsConflated) { Set senders = cache.getGatewaySenders(); GatewaySender sender = null; @@ -1715,7 +1759,7 @@ public class WANTestBase extends DistributedTestCase{ final GatewaySenderStats statistics = ((AbstractGatewaySender)sender).getStatistics(); assertEquals(eventsConflated, statistics.getEventsNotQueuedConflated()); } - + public static void checkAsyncEventQueueConflatedStats( String asyncEventQueueId, final int eventsConflated) { Set queues = cache.getAsyncEventQueues(); @@ -1730,7 +1774,7 @@ public class WANTestBase extends DistributedTestCase{ .getStatistics(); assertEquals(eventsConflated, statistics.getEventsNotQueuedConflated()); } - + public static void checkStats_Failover(String senderId, final int eventsReceived) { Set senders = cache.getGatewaySenders(); @@ -1749,7 +1793,7 @@ public class WANTestBase extends DistributedTestCase{ + statistics.getUnprocessedTokensAddedByPrimary() + statistics .getUnprocessedEventsRemovedByPrimary())); } - + public static void checkAsyncEventQueueStats_Failover(String asyncEventQueueId, final int eventsReceived) { Set asyncEventQueues = cache.getAsyncEventQueues(); @@ -1783,7 +1827,7 @@ public class WANTestBase extends DistributedTestCase{ assert (statistics.getBatchesDistributed() >= batches); assertEquals(0, statistics.getBatchesRedistributed()); } - + public static void checkAsyncEventQueueBatchStats(String asyncQueueId, final int batches) { Set queues = cache.getAsyncEventQueues(); @@ -1835,7 +1879,7 @@ public class WANTestBase extends DistributedTestCase{ (statistics.getUnprocessedEventsRemovedByPrimary() + statistics .getUnprocessedTokensAddedByPrimary())); } - + public static void checkAsyncEventQueueUnprocessedStats(String asyncQueueId, int events) { Set asyncQueues = cache.getAsyncEventQueues(); AsyncEventQueue queue = null; @@ -1853,34 +1897,7 @@ public class WANTestBase extends DistributedTestCase{ (statistics.getUnprocessedEventsRemovedByPrimary() + statistics .getUnprocessedTokensAddedByPrimary())); } - - - public static void setRemoveFromQueueOnException(String senderId, boolean removeFromQueue){ - Set senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for(GatewaySender s : senders){ - if(s.getId().equals(senderId)){ - sender = s; - break; - } - } - assertNotNull(sender); - ((AbstractGatewaySender)sender).setRemoveFromQueueOnException(removeFromQueue); - } - - public static void unsetRemoveFromQueueOnException(String senderId){ - Set senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for(GatewaySender s : senders){ - if(s.getId().equals(senderId)){ - sender = s; - break; - } - } - assertNotNull(sender); - ((AbstractGatewaySender)sender).setRemoveFromQueueOnException(false); - } - + public static void waitForSenderRunningState(String senderId){ final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); try { @@ -1904,7 +1921,7 @@ public class WANTestBase extends DistributedTestCase{ exln.remove(); } } - + public static void waitForSenderToBecomePrimary(String senderId){ Set senders = ((GemFireCacheImpl)cache).getAllGatewaySenders(); final GatewaySender sender = getGatewaySenderById(senders, senderId); @@ -1920,9 +1937,9 @@ public class WANTestBase extends DistributedTestCase{ return "Expected sender primary state to be true but is false"; } }; - Wait.waitForCriterion(wc, 10000, 1000, true); + Wait.waitForCriterion(wc, 10000, 1000, true); } - + private static GatewaySender getGatewaySenderById(Set senders, String senderId) { for(GatewaySender s : senders){ if(s.getId().equals(senderId)){ @@ -1932,7 +1949,7 @@ public class WANTestBase extends DistributedTestCase{ //if none of the senders matches with the supplied senderid, return null return null; } - + public static HashMap checkQueue(){ HashMap listenerAttrs = new HashMap(); listenerAttrs.put("Create", listener1.createList); @@ -1940,13 +1957,13 @@ public class WANTestBase extends DistributedTestCase{ listenerAttrs.put("Destroy", listener1.destroyList); return listenerAttrs; } - + public static void checkQueueOnSecondary (final Map primaryUpdatesMap){ final HashMap secondaryUpdatesMap = new HashMap(); secondaryUpdatesMap.put("Create", listener1.createList); secondaryUpdatesMap.put("Update", listener1.updateList); secondaryUpdatesMap.put("Destroy", listener1.destroyList); - + WaitCriterion wc = new WaitCriterion() { public boolean done() { secondaryUpdatesMap.put("Create", listener1.createList); @@ -1962,9 +1979,9 @@ public class WANTestBase extends DistributedTestCase{ return "Expected seconadry map to be " + primaryUpdatesMap + " but it is " + secondaryUpdatesMap; } }; - Wait.waitForCriterion(wc, 300000, 500, true); + Wait.waitForCriterion(wc, 300000, 500, true); } - + public static HashMap checkQueue2(){ HashMap listenerAttrs = new HashMap(); listenerAttrs.put("Create", listener2.createList); @@ -1972,18 +1989,18 @@ public class WANTestBase extends DistributedTestCase{ listenerAttrs.put("Destroy", listener2.destroyList); return listenerAttrs; } - + public static HashMap checkPR(String regionName){ PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName); QueueListener listener = (QueueListener)region.getCacheListener(); - + HashMap listenerAttrs = new HashMap(); listenerAttrs.put("Create", listener.createList); listenerAttrs.put("Update", listener.updateList); listenerAttrs.put("Destroy", listener.destroyList); return listenerAttrs; } - + public static HashMap checkBR(String regionName, int numBuckets){ PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName); HashMap listenerAttrs = new HashMap(); @@ -1996,7 +2013,7 @@ public class WANTestBase extends DistributedTestCase{ } return listenerAttrs; } - + public static HashMap checkQueue_PR(String senderId){ Set senders = cache.getGatewaySenders(); GatewaySender sender = null; @@ -2006,20 +2023,20 @@ public class WANTestBase extends DistributedTestCase{ break; } } - + RegionQueue parallelQueue = (RegionQueue)((AbstractGatewaySender)sender) .getQueues().toArray(new RegionQueue[1])[0]; - + PartitionedRegion region = (PartitionedRegion)parallelQueue.getRegion(); QueueListener listener = (QueueListener)region.getCacheListener(); - + HashMap listenerAttrs = new HashMap(); listenerAttrs.put("Create", listener.createList); listenerAttrs.put("Update", listener.updateList); listenerAttrs.put("Destroy", listener.destroyList); return listenerAttrs; } - + public static HashMap checkQueue_BR(String senderId, int numBuckets){ Set senders = cache.getGatewaySenders(); GatewaySender sender = null; @@ -2031,7 +2048,7 @@ public class WANTestBase extends DistributedTestCase{ } RegionQueue parallelQueue = (RegionQueue)((AbstractGatewaySender)sender) .getQueues().toArray(new RegionQueue[1])[0]; - + PartitionedRegion region = (PartitionedRegion)parallelQueue.getRegion(); HashMap listenerAttrs = new HashMap(); for (int i = 0; i < numBuckets; i++) { @@ -2052,7 +2069,7 @@ public class WANTestBase extends DistributedTestCase{ WANTestBase test = new WANTestBase(getTestMethodName()); test.addCacheListenerOnBucketRegion(regionName, numBuckets); } - + private void addCacheListenerOnBucketRegion(String regionName, int numBuckets){ PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName); for (int i = 0; i < numBuckets; i++) { @@ -2062,12 +2079,12 @@ public class WANTestBase extends DistributedTestCase{ mutator.addCacheListener(listener1); } } - + public static void addListenerOnQueueBucketRegion(String senderId, int numBuckets) { WANTestBase test = new WANTestBase(getTestMethodName()); test.addCacheListenerOnQueueBucketRegion(senderId, numBuckets); } - + private void addCacheListenerOnQueueBucketRegion(String senderId, int numBuckets){ Set senders = cache.getGatewaySenders(); GatewaySender sender = null; @@ -2079,7 +2096,7 @@ public class WANTestBase extends DistributedTestCase{ } RegionQueue parallelQueue = (RegionQueue)((AbstractGatewaySender)sender) .getQueues().toArray(new RegionQueue[1])[0]; - + PartitionedRegion region = (PartitionedRegion)parallelQueue.getRegion(); for (int i = 0; i < numBuckets; i++) { BucketRegion br = region.getBucketRegion(i); @@ -2089,19 +2106,19 @@ public class WANTestBase extends DistributedTestCase{ mutator.addCacheListener(listener); } } - + } - + public static void addQueueListener(String senderId, boolean isParallel){ WANTestBase test = new WANTestBase(getTestMethodName()); test.addCacheQueueListener(senderId, isParallel); } - + public static void addSecondQueueListener(String senderId, boolean isParallel){ WANTestBase test = new WANTestBase(getTestMethodName()); test.addSecondCacheQueueListener(senderId, isParallel); } - + public static void addListenerOnRegion(String regionName){ WANTestBase test = new WANTestBase(getTestMethodName()); test.addCacheListenerOnRegion(regionName); @@ -2112,7 +2129,7 @@ public class WANTestBase extends DistributedTestCase{ listener1 = new QueueListener(); mutator.addCacheListener(listener1); } - + private void addCacheQueueListener(String senderId, boolean isParallel) { Set senders = cache.getGatewaySenders(); GatewaySender sender = null; @@ -2158,7 +2175,7 @@ public class WANTestBase extends DistributedTestCase{ parallelQueue.addCacheListener(listener2); } } - + public static void pauseSender(String senderId) { final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class @@ -2174,14 +2191,14 @@ public class WANTestBase extends DistributedTestCase{ } sender.pause(); ((AbstractGatewaySender) sender).getEventProcessor().waitForDispatcherToPause(); - + } finally { exp.remove(); exln.remove(); } } - + public static void pauseSenderAndWaitForDispatcherToPause(String senderId) { final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class @@ -2200,9 +2217,9 @@ public class WANTestBase extends DistributedTestCase{ } finally { exp.remove(); exln.remove(); - } + } } - + public static void resumeSender(String senderId) { final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class @@ -2223,7 +2240,7 @@ public class WANTestBase extends DistributedTestCase{ exln.remove(); } } - + public static void stopSender(String senderId) { final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class @@ -2242,7 +2259,7 @@ public class WANTestBase extends DistributedTestCase{ eventProcessor = ((AbstractGatewaySender) sender).getEventProcessor(); } sender.stop(); - + Set queues = null; if (eventProcessor instanceof ConcurrentSerialGatewaySenderEventProcessor) { queues = ((ConcurrentSerialGatewaySenderEventProcessor)eventProcessor).getQueues(); @@ -2258,14 +2275,14 @@ public class WANTestBase extends DistributedTestCase{ exln.remove(); } } - + public static void stopReceivers() { Set receivers = cache.getGatewayReceivers(); for (GatewayReceiver receiver : receivers) { receiver.stop(); } } - + public static void startReceivers() { Set receivers = cache.getGatewayReceivers(); for (GatewayReceiver receiver : receivers) { @@ -2276,12 +2293,12 @@ public class WANTestBase extends DistributedTestCase{ } } } - + public static GatewaySenderFactory configureGateway(DiskStoreFactory dsf, File[] dirs1, String dsName, int remoteDsId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, GatewayEventFilter filter, boolean isManualStart, int numDispatchers, OrderPolicy policy) { - + InternalGatewaySenderFactory gateway = (InternalGatewaySenderFactory)cache.createGatewaySenderFactory(); gateway.setParallel(isParallel); gateway.setMaximumQueueMemory(maxMemory); @@ -2305,7 +2322,7 @@ public class WANTestBase extends DistributedTestCase{ } return gateway; } - + public static void createSender(String dsName, int remoteDsId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, @@ -2324,7 +2341,7 @@ public class WANTestBase extends DistributedTestCase{ exln.remove(); } } - + public static void createSenderWithMultipleDispatchers(String dsName, int remoteDsId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, @@ -2343,12 +2360,12 @@ public class WANTestBase extends DistributedTestCase{ exln.remove(); } } - + public static void createSenderWithoutDiskStore(String dsName, int remoteDsId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, GatewayEventFilter filter, boolean isManulaStart) { - + GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); gateway.setParallel(true); gateway.setMaximumQueueMemory(maxMemory); @@ -2359,7 +2376,7 @@ public class WANTestBase extends DistributedTestCase{ gateway.setBatchConflationEnabled(isConflation); gateway.create(dsName, remoteDsId); } - + public static void createConcurrentSender(String dsName, int remoteDsId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, GatewayEventFilter filter, @@ -2373,16 +2390,16 @@ public class WANTestBase extends DistributedTestCase{ GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, remoteDsId, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter, isManualStart, concurrencyLevel, policy); gateway.create(dsName, remoteDsId); } - + // public static void createSender_PDX(String dsName, int remoteDsId, // boolean isParallel, Integer maxMemory, // Integer batchSize, boolean isConflation, boolean isPersistent, // GatewayEventFilter filter, boolean isManulaStart) { // File persistentDirectory = new File(dsName +"_disk_"+System.currentTimeMillis()+"_" + VM.getCurrentVMNum()); // persistentDirectory.mkdir(); -// +// // File [] dirs1 = new File[] {persistentDirectory}; -// +// // if(isParallel) { // ParallelGatewaySenderFactory gateway = cache.createParallelGatewaySenderFactory(); // gateway.setMaximumQueueMemory(maxMemory); @@ -2398,7 +2415,7 @@ public class WANTestBase extends DistributedTestCase{ // } // gateway.setBatchConflationEnabled(isConflation); // gateway.create(dsName, remoteDsId); -// +// // }else { // SerialGatewaySenderFactory gateway = cache.createSerialGatewaySenderFactory(); // gateway.setMaximumQueueMemory(maxMemory); @@ -2502,7 +2519,7 @@ public class WANTestBase extends DistributedTestCase{ exp1.remove(); } } - + public static String createSenderWithDiskStore(String dsName, int remoteDsId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, @@ -2513,14 +2530,14 @@ public class WANTestBase extends DistributedTestCase{ + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); } else { - persistentDirectory = new File(dsStore); + persistentDirectory = new File(dsStore); } LogWriterUtils.getLogWriter().info("The ds is : " + persistentDirectory.getName()); - + persistentDirectory.mkdir(); DiskStoreFactory dsf = cache.createDiskStoreFactory(); File [] dirs1 = new File[] {persistentDirectory}; - + if(isParallel) { GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); gateway.setParallel(true); @@ -2546,7 +2563,7 @@ public class WANTestBase extends DistributedTestCase{ } gateway.setBatchConflationEnabled(isConflation); gateway.create(dsName, remoteDsId); - + }else { GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); gateway.setMaximumQueueMemory(maxMemory); @@ -2565,15 +2582,15 @@ public class WANTestBase extends DistributedTestCase{ } else { DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); - + gateway.setDiskStoreName(store.getName()); } gateway.create(dsName, remoteDsId); } return persistentDirectory.getName(); } - - + + public static void createSenderWithListener(String dsName, int remoteDsName, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, @@ -2641,7 +2658,7 @@ public class WANTestBase extends DistributedTestCase{ ((InternalGatewaySenderFactory)gateway).create(dsName); } } - + public static void pauseWaitCriteria(final long millisec) { WaitCriterion wc = new WaitCriterion() { public boolean done() { @@ -2652,18 +2669,31 @@ public class WANTestBase extends DistributedTestCase{ return "Expected to wait for " + millisec + " millisec."; } }; - Wait.waitForCriterion(wc, millisec, 500, false); + Wait.waitForCriterion(wc, millisec, 500, false); } - - public static int createReceiver(int locPort) { - WANTestBase test = new WANTestBase(getTestMethodName()); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); - props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort - + "]"); - InternalDistributedSystem ds = test.getSystem(props); - cache = CacheFactory.create(ds); + public static void createReceiverInVMs(int locatorPort, VM... vms) { + for (VM vm : vms) { + vm.invoke(() -> createReceiver(locatorPort)); + } + } + + public static void createReceiverInVMsAsync(int locatorPort, VM... vms) { + List tasks = new LinkedList<>(); + for (VM vm : vms) { + tasks.add(vm.invokeAsync(() -> createReceiver(locatorPort))); + } + for (AsyncInvocation invocation : tasks) { + try { + invocation.join(30000); + } + catch (InterruptedException e) { + fail("Failed starting up the receiver"); + } + } + } + + public static int createReceiver(int locPort) { GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); fact.setStartPort(port); @@ -2675,12 +2705,12 @@ public class WANTestBase extends DistributedTestCase{ } catch (IOException e) { e.printStackTrace(); - fail("Test " + test.getName() + fail("Test " + getTestMethodName() + " failed to start GatewayRecevier on port " + port); } return port; } - + public static void createReceiverWithBindAddress(int locPort) { WANTestBase test = new WANTestBase(getTestMethodName()); Properties props = test.getDistributedSystemProperties(); @@ -2690,7 +2720,7 @@ public class WANTestBase extends DistributedTestCase{ + "]"); InternalDistributedSystem ds = test.getSystem(props); - cache = CacheFactory.create(ds); + cache = CacheFactory.create(ds); GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); fact.setStartPort(port); @@ -2718,7 +2748,7 @@ public class WANTestBase extends DistributedTestCase{ String gatewaySslprotocols = "any"; String gatewaySslciphers = "any"; boolean gatewaySslRequireAuth = true; - + Properties gemFireProps = test.getDistributedSystemProperties(); gemFireProps.put(DistributionConfig.LOG_LEVEL_NAME, LogWriterUtils.getDUnitLogLevel()); @@ -2726,22 +2756,22 @@ public class WANTestBase extends DistributedTestCase{ gemFireProps.put(DistributionConfig.GATEWAY_SSL_PROTOCOLS_NAME, gatewaySslprotocols); gemFireProps.put(DistributionConfig.GATEWAY_SSL_CIPHERS_NAME, gatewaySslciphers); gemFireProps.put(DistributionConfig.GATEWAY_SSL_REQUIRE_AUTHENTICATION_NAME, String.valueOf(gatewaySslRequireAuth)); - + gemFireProps.put(DistributionConfig.GATEWAY_SSL_KEYSTORE_TYPE_NAME, "jks"); - gemFireProps.put(DistributionConfig.GATEWAY_SSL_KEYSTORE_NAME, + gemFireProps.put(DistributionConfig.GATEWAY_SSL_KEYSTORE_NAME, TestUtil.getResourcePath(WANTestBase.class, "/com/gemstone/gemfire/cache/client/internal/cacheserver.keystore")); gemFireProps.put(DistributionConfig.GATEWAY_SSL_KEYSTORE_PASSWORD_NAME, "password"); - gemFireProps.put(DistributionConfig.GATEWAY_SSL_TRUSTSTORE_NAME, + gemFireProps.put(DistributionConfig.GATEWAY_SSL_TRUSTSTORE_NAME, TestUtil.getResourcePath(WANTestBase.class, "/com/gemstone/gemfire/cache/client/internal/cacheserver.truststore")); gemFireProps.put(DistributionConfig.GATEWAY_SSL_TRUSTSTORE_PASSWORD_NAME, "password"); - + gemFireProps.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); gemFireProps.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort + "]"); LogWriterUtils.getLogWriter().info("Starting cache ds with following properties \n" + gemFireProps); - + InternalDistributedSystem ds = test.getSystem(gemFireProps); - cache = CacheFactory.create(ds); + cache = CacheFactory.create(ds); GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); fact.setStartPort(port); @@ -2758,16 +2788,16 @@ public class WANTestBase extends DistributedTestCase{ } return port; } - + public static String makePath(String[] strings) { StringBuilder sb = new StringBuilder(); for(int i=0;i testQueueSize(senderId, numQueueEntries)); } - + public static void testQueueSize(String senderId, int numQueueEntries) { GatewaySender sender = null; for (GatewaySender s : cache.getGatewaySenders()) { @@ -3468,7 +3498,7 @@ public class WANTestBase extends DistributedTestCase{ assertEquals(numQueueEntries, size); } } - + /** * To be used only for ParallelGatewaySender. * @param senderId Id of the ParallelGatewaySender @@ -3487,7 +3517,7 @@ public class WANTestBase extends DistributedTestCase{ if (sender.isParallel()) { int totalSize = 0; final Set queues = ((AbstractGatewaySender)sender).getQueues(); - + WaitCriterion wc = new WaitCriterion() { int size = 0; public boolean done() { @@ -3505,13 +3535,13 @@ public class WANTestBase extends DistributedTestCase{ return " Expected local queue entries: " + numQueueEntries + " but actual entries: " + size; } - + }; - + Wait.waitForCriterion(wc, 120000, 500, true); } } - + /** * To be used only for ParallelGatewaySender. * @param senderId Id of the ParallelGatewaySender @@ -3535,12 +3565,12 @@ public class WANTestBase extends DistributedTestCase{ } return -1; } - + public static void doUpdates(String regionName, int numUpdates) { Region r = cache.getRegion(Region.SEPARATOR + regionName); assertNotNull(r); for (int i = 0; i < numUpdates; i++) { - String s = "K"+i; + String s = "K"+i; r.put(i, s); } } @@ -3554,7 +3584,7 @@ public class WANTestBase extends DistributedTestCase{ r.put(key, s); } } - + public static void doRandomUpdates(String regionName, int numUpdates) { Region r = cache.getRegion(Region.SEPARATOR + regionName); assertNotNull(r); @@ -3563,11 +3593,11 @@ public class WANTestBase extends DistributedTestCase{ generatedKeys.add((new Random()).nextInt(r.size())); } for (Integer i: generatedKeys) { - String s = "K"+i; + String s = "K"+i; r.put(i, s); } } - + public static void doMultiThreadedPuts(String regionName, int numPuts) { final AtomicInteger ai = new AtomicInteger(-1); final ExecutorService execService = Executors.newFixedThreadPool(5, @@ -3624,13 +3654,13 @@ public class WANTestBase extends DistributedTestCase{ + " present region keyset " + r.keySet(); } }; - Wait.waitForCriterion(wc, 240000, 500, true); + Wait.waitForCriterion(wc, 30000, 500, true); } finally { exp.remove(); exp1.remove(); } } - + /** * Validate whether all the attributes set on AsyncEventQueueFactory are set * on the sender underneath the AsyncEventQueue. @@ -3662,7 +3692,7 @@ public class WANTestBase extends DistributedTestCase{ assertEquals("batchConflation", batchConflationEnabled, theSender .isBatchConflationEnabled()); } - + public static void validateAsyncEventListener(String asyncQueueId, final int expectedSize) { AsyncEventListener theListener = null; @@ -3690,7 +3720,7 @@ public class WANTestBase extends DistributedTestCase{ }; Wait.waitForCriterion(wc, 60000, 500, true); //TODO:Yogs } - + public static void validateCustomAsyncEventListener(String asyncQueueId, final int expectedSize) { AsyncEventListener theListener = null; @@ -3718,14 +3748,14 @@ public class WANTestBase extends DistributedTestCase{ } }; Wait.waitForCriterion(wc, 60000, 500, true); // TODO:Yogs - + Iterator itr = eventsMap.values().iterator(); while (itr.hasNext()) { AsyncEvent event = itr.next(); assertTrue("possibleDuplicate should be true for event: " + event, event.getPossibleDuplicate()); } } - + public static void waitForAsyncQueueToGetEmpty(String asyncQueueId) { AsyncEventQueue theAsyncEventQueue = null; @@ -3795,7 +3825,7 @@ public class WANTestBase extends DistributedTestCase{ Wait.waitForCriterion(wc, 60000, 500, true); } } - + public static void verifyAsyncEventListenerForPossibleDuplicates( String asyncEventQueueId, Set bucketIds, int batchSize) { AsyncEventListener theListener = null; @@ -3840,10 +3870,10 @@ public class WANTestBase extends DistributedTestCase{ LogWriterUtils.getLogWriter().info("The events map size is " + eventsMap.size()); return eventsMap.size(); } - + public static int getAsyncEventQueueSize(String asyncEventQueueId) { AsyncEventQueue theQueue = null; - + Set asyncEventQueues = cache.getAsyncEventQueues(); for (AsyncEventQueue asyncQueue : asyncEventQueues) { if (asyncEventQueueId.equals(asyncQueue.getId())) { @@ -3854,7 +3884,7 @@ public class WANTestBase extends DistributedTestCase{ return theQueue.size(); } - + public static void validateRegionSize_PDX(String regionName, final int regionSize) { final Region r = cache.getRegion(Region.SEPARATOR + regionName); assertNotNull(r); @@ -3867,11 +3897,11 @@ public class WANTestBase extends DistributedTestCase{ } public String description() { - + return "Expected region entries: " + regionSize + " but actual entries: " + r.keySet().size() + " present region keyset " + r.keySet() ; } }; - Wait.waitForCriterion(wc, 200000, 500, true); + Wait.waitForCriterion(wc, 200000, 500, true); for(int i = 0 ; i < regionSize; i++){ LogWriterUtils.getLogWriter().info("For Key : Key_"+i + " : Values : " + r.get("Key_" + i)); assertEquals(new SimpleClass(i, (byte)i), r.get("Key_" + i)); @@ -3889,27 +3919,27 @@ public class WANTestBase extends DistributedTestCase{ } public String description() { - + return "Expected region entries: " + regionSize + " but actual entries: " + r.keySet().size() + " present region keyset " + r.keySet() ; } }; - Wait.waitForCriterion(wc, 200000, 500, true); + Wait.waitForCriterion(wc, 200000, 500, true); for(int i = 0 ; i < regionSize; i++){ LogWriterUtils.getLogWriter().info("For Key : Key_"+i + " : Values : " + r.get("Key_" + i)); assertEquals(new SimpleClass1(false, (short) i, "" + i, i,"" +i ,""+ i,i, i), r.get("Key_" + i)); } } - + public static void validateQueueSizeStat(String id, final int queueSize) { final AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(id); - + Wait.waitForCriterion(new WaitCriterion() { - + @Override public boolean done() { return sender.getEventQueueSize() == queueSize; } - + @Override public String description() { // TODO Auto-generated method stub @@ -3921,9 +3951,9 @@ public class WANTestBase extends DistributedTestCase{ /** * This method is specifically written for pause and stop operations. * This method validates that the region size remains same for at least minimum number of verification - * attempts and also it remains below a specified limit value. This validation will suffice for - * testing of pause/stop operations. - * + * attempts and also it remains below a specified limit value. This validation will suffice for + * testing of pause/stop operations. + * * @param regionName * @param regionSizeLimit */ @@ -3944,8 +3974,8 @@ public class WANTestBase extends DistributedTestCase{ } else { return false; } - - } else { //current regionSize is not same as recorded previous regionSize + + } else { //current regionSize is not same as recorded previous regionSize previousSize = r.keySet().size(); //update the previousSize variable with current region size sameRegionSizeCounter = 0;//reset the sameRegionSizeCounter return false; @@ -3956,21 +3986,21 @@ public class WANTestBase extends DistributedTestCase{ return "Expected region size to remain same below a specified limit but actual region size does not remain same or exceeded the specified limit " + sameRegionSizeCounter + " :regionSize " + previousSize; } }; - Wait.waitForCriterion(wc, 200000, 500, true); + Wait.waitForCriterion(wc, 200000, 500, true); } - + public static String getRegionFullPath(String regionName) { final Region r = cache.getRegion(Region.SEPARATOR + regionName); assertNotNull(r); return r.getFullPath(); } - + public static Integer getRegionSize(String regionName) { final Region r = cache.getRegion(Region.SEPARATOR + regionName); assertNotNull(r); return r.keySet().size(); } - + public static void validateRegionContents(String regionName, final Map keyValues) { final Region r = cache.getRegion(Region.SEPARATOR + regionName); assertNotNull(r); @@ -3991,9 +4021,9 @@ public class WANTestBase extends DistributedTestCase{ return "Expected region entries doesn't match"; } }; - Wait.waitForCriterion(wc, 120000, 500, true); + Wait.waitForCriterion(wc, 120000, 500, true); } - + public static void CheckContent(String regionName, final int regionSize) { final Region r = cache.getRegion(Region.SEPARATOR + regionName); assertNotNull(r); @@ -4001,7 +4031,7 @@ public class WANTestBase extends DistributedTestCase{ assertEquals(i, r.get(i)); } } - + public static void validateRegionContentsForPR(String regionName, final int regionSize) { final Region r = cache.getRegion(Region.SEPARATOR + regionName); @@ -4018,14 +4048,14 @@ public class WANTestBase extends DistributedTestCase{ return "Expected region entries: " + regionSize + " but actual entries: " + r.keySet().size(); } }; - Wait.waitForCriterion(wc, 120000, 500, true); + Wait.waitForCriterion(wc, 120000, 500, true); } - + public static void verifyPrimaryStatus(final Boolean isPrimary) { final Set senders = cache.getGatewaySenders(); assertEquals(senders.size(), 1); final AbstractGatewaySender sender = (AbstractGatewaySender)senders.iterator().next(); - + WaitCriterion wc = new WaitCriterion() { public boolean done() { if (sender.isPrimary() == isPrimary.booleanValue()) { @@ -4040,7 +4070,7 @@ public class WANTestBase extends DistributedTestCase{ }; Wait.waitForCriterion(wc, 120000, 500, true); } - + public static Boolean getPrimaryStatus(){ Set senders = cache.getGatewaySenders(); assertEquals(senders.size(), 1); @@ -4060,12 +4090,12 @@ public class WANTestBase extends DistributedTestCase{ Wait.waitForCriterion(wc, 10000, 500, false); return sender.isPrimary(); } - + public static Set getAllPrimaryBucketsOnTheNode(String regionName) { PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName); return region.getDataStore().getAllLocalPrimaryBucketIds(); } - + public static void doHeavyPuts(String regionName, int numPuts) { Region r = cache.getRegion(Region.SEPARATOR + regionName); assertNotNull(r); @@ -4075,7 +4105,7 @@ public class WANTestBase extends DistributedTestCase{ r.put(i, new byte[1024*1024]); } } - + public static void addListenerAndKillPrimary(){ Set senders = ((GemFireCacheImpl)cache).getAllGatewaySenders(); assertEquals(senders.size(), 1); @@ -4084,7 +4114,7 @@ public class WANTestBase extends DistributedTestCase{ assertNotNull(queue); CacheListenerAdapter cl = new CacheListenerAdapter() { public void afterCreate(EntryEvent event) { - if((Long)event.getKey() > 900){ + if((Long)event.getKey() > 900){ cache.getLogger().fine(" Gateway sender is killed by a test"); cache.close(); cache.getDistributedSystem().disconnect(); @@ -4093,28 +4123,28 @@ public class WANTestBase extends DistributedTestCase{ }; queue.getAttributesMutator().addCacheListener(cl); } - + public static void addCacheListenerAndDestroyRegion(String regionName){ final Region region = cache.getRegion(Region.SEPARATOR + regionName); assertNotNull(region); CacheListenerAdapter cl = new CacheListenerAdapter() { @Override public void afterCreate(EntryEvent event) { - if((Long)event.getKey() == 99){ + if((Long)event.getKey() == 99){ region.destroyRegion(); } } }; region.getAttributesMutator().addCacheListener(cl); } - + public static void addCacheListenerAndCloseCache(String regionName){ final Region region = cache.getRegion(Region.SEPARATOR + regionName); assertNotNull(region); CacheListenerAdapter cl = new CacheListenerAdapter() { @Override public void afterCreate(EntryEvent event) { - if((Long)event.getKey() == 900){ + if((Long)event.getKey() == 900){ cache.getLogger().fine(" Gateway sender is killed by a test"); cache.close(); cache.getDistributedSystem().disconnect(); @@ -4123,7 +4153,7 @@ public class WANTestBase extends DistributedTestCase{ }; region.getAttributesMutator().addCacheListener(cl); } - + public static Boolean killSender(String senderId){ final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); IgnoredException exp = IgnoredException.addIgnoredException(CacheClosedException.class @@ -4149,9 +4179,9 @@ public class WANTestBase extends DistributedTestCase{ exp.remove(); exp1.remove(); exln.remove(); - } + } } - + public static Boolean killAsyncEventQueue(String asyncQueueId){ Set queues = cache.getAsyncEventQueues(); AsyncEventQueueImpl queue = null; @@ -4168,14 +4198,14 @@ public class WANTestBase extends DistributedTestCase{ } return Boolean.FALSE; } - + public static void killSender(){ - LogWriterUtils.getLogWriter().info("Gateway sender is going to be killed by a test"); + LogWriterUtils.getLogWriter().info("Gateway sender is going to be killed by a test"); cache.close(); cache.getDistributedSystem().disconnect(); LogWriterUtils.getLogWriter().info("Gateway sender is killed by a test"); } - + static void waitForSitesToUpdate() { WaitCriterion wc = new WaitCriterion() { public boolean done() { @@ -4187,21 +4217,21 @@ public class WANTestBase extends DistributedTestCase{ }; Wait.waitForCriterion(wc, 10000, 500, false); } - - public static void checkAllSiteMetaData( + + public static void checkAllSiteMetaData( Map> dsVsPorts) { waitForSitesToUpdate(); assertNotNull(getSystemStatic()); // Map> allSiteMetaData = ((DistributionConfigImpl)system // .getConfig()).getAllServerLocatorsInfo(); - + List locatorsConfigured = Locator.getLocators(); Locator locator = locatorsConfigured.get(0); Map> allSiteMetaData = ((InternalLocator)locator).getlocatorMembershipListener().getAllLocatorsInfo(); System.out.println("allSiteMetaData : " + allSiteMetaData); System.out.println("dsVsPorts : " + dsVsPorts); System.out.println("Server allSiteMetaData : " + ((InternalLocator)locator).getlocatorMembershipListener().getAllServerLocatorsInfo()); - + //assertEquals(dsVsPorts.size(), allSiteMetaData.size()); for (Map.Entry> entry : dsVsPorts.entrySet()) { Set locators = allSiteMetaData.get(entry.getKey()); @@ -4219,9 +4249,9 @@ public class WANTestBase extends DistributedTestCase{ } } } - + public static Long checkAllSiteMetaDataFor3Sites(final Map> dsVsPort) { - + WaitCriterion wc = new WaitCriterion() { public boolean done() { if (getSystemStatic() != null) { @@ -4236,12 +4266,12 @@ public class WANTestBase extends DistributedTestCase{ return "Making sure system is initialized"; } }; - Wait.waitForCriterion(wc, 50000, 1000, true); + Wait.waitForCriterion(wc, 50000, 1000, true); assertNotNull(getSystemStatic()); - + // final Map> allSiteMetaData = ((DistributionConfigImpl)system // .getConfig()).getAllServerLocatorsInfo(); - + List locatorsConfigured = Locator.getLocators(); Locator locator = locatorsConfigured.get(0); LocatorMembershipListener listener = ((InternalLocator)locator).getlocatorMembershipListener(); @@ -4250,7 +4280,7 @@ public class WANTestBase extends DistributedTestCase{ } final Map> allSiteMetaData = listener.getAllLocatorsInfo(); System.out.println("allSiteMetaData : " + allSiteMetaData); - + wc = new WaitCriterion() { public boolean done() { if (dsVsPort.size() == allSiteMetaData.size()) { @@ -4275,10 +4305,10 @@ public class WANTestBase extends DistributedTestCase{ + " but actual meta data: " + allSiteMetaData; } }; - Wait.waitForCriterion(wc, 300000, 500, true); + Wait.waitForCriterion(wc, 300000, 500, true); return System.currentTimeMillis(); } - + public static void checkLocatorsinSender(String senderId, InetSocketAddress locatorToWaitFor) throws InterruptedException { @@ -4290,17 +4320,17 @@ public class WANTestBase extends DistributedTestCase{ break; } } - + MyLocatorCallback callback = (MyLocatorCallbac