geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jinmeil...@apache.org
Subject [geode] branch develop updated: GEODE-4118/GEODE-5406: rework ClientHealthStatsDUnitTest (#2203)
Date Fri, 27 Jul 2018 15:56:51 GMT
This is an automated email from the ASF dual-hosted git repository.

jinmeiliao pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new fe19efe  GEODE-4118/GEODE-5406: rework ClientHealthStatsDUnitTest (#2203)
fe19efe is described below

commit fe19efe01f35c43129d678af745415022c1190e1
Author: jinmeiliao <jiliao@pivotal.io>
AuthorDate: Fri Jul 27 08:56:46 2018 -0700

    GEODE-4118/GEODE-5406: rework ClientHealthStatsDUnitTest (#2203)
---
 .../management/ClientHealthStatsDUnitTest.java     | 412 +++++++--------------
 .../geode/test/dunit/rules/ClusterStartupRule.java |  16 +-
 .../apache/geode/test/dunit/rules/MemberVM.java    |   7 +
 .../dunit/rules/tests/MemberStarterRuleTest.java   |   9 +
 .../geode/test/junit/rules/LocatorStarterRule.java |  20 +-
 .../geode/test/junit/rules/MemberStarterRule.java  |  27 +-
 6 files changed, 201 insertions(+), 290 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/ClientHealthStatsDUnitTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/management/ClientHealthStatsDUnitTest.java
index 68fc6c7..2336940 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/management/ClientHealthStatsDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/management/ClientHealthStatsDUnitTest.java
@@ -12,352 +12,210 @@
  * or implied. See the License for the specific language governing permissions and limitations
under
  * the License.
  */
+
 package org.apache.geode.management;
 
-import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
 import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_TIMEOUT;
-import static org.apache.geode.distributed.ConfigurationProperties.STATISTIC_SAMPLING_ENABLED;
-import static org.apache.geode.test.dunit.Host.getHost;
-import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
-import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM;
-import static org.apache.geode.test.dunit.NetworkUtils.getServerHostName;
 import static org.assertj.core.api.Assertions.assertThat;
 
-import java.io.IOException;
 import java.io.Serializable;
-import java.util.Collection;
-import java.util.Properties;
-
-import javax.management.ObjectName;
 
 import org.awaitility.Awaitility;
-import org.awaitility.core.ConditionFactory;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
-import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.EntryEvent;
 import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionFactory;
 import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.cache.client.ClientCache;
-import org.apache.geode.cache.client.ClientCacheFactory;
 import org.apache.geode.cache.client.ClientRegionFactory;
 import org.apache.geode.cache.client.ClientRegionShortcut;
-import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.cache.util.CacheListenerAdapter;
-import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
-import org.apache.geode.management.internal.SystemManagementService;
-import org.apache.geode.test.dunit.VM;
-
-/**
- * Distributed tests for client stats exposed via {@link CacheServerMXBean}:
- * <ul>
- * <li>{@link CacheServerMXBean#showClientStats}
- * <li>{@link CacheServerMXBean#showAllClientStats}
- * <li>{@link CacheServerMXBean#showClientQueueDetails}
- * </ul>
- */
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.rules.VMProvider;
 
-@SuppressWarnings({"serial", "unused"})
 public class ClientHealthStatsDUnitTest implements Serializable {
-
   private static final int NUMBER_PUTS = 100;
 
-  private static final String KEY1 = "KEY1";
-  private static final String KEY2 = "KEY2";
-  private static final String VALUE1 = "VALUE1";
-  private static final String VALUE2 = "VALUE2";
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule();
 
-  private static final String REGION_NAME =
-      ClientHealthStatsDUnitTest.class.getSimpleName() + "_Region";
+  private MemberVM locator;
+  private MemberVM server;
+  private ClientVM client1;
+  private ClientVM client2;
 
-  // client1VM and client2VM VM fields
-  private static volatile ClientCache clientCache;
   private static volatile boolean lastKeyReceived;
 
-  private VM managerVM;
-  private VM serverVM;
-  private VM client1VM;
-  private VM client2VM;
-
-  private String hostName;
-
-  @Rule
-  public ManagementTestRule managementTestRule = ManagementTestRule.builder().build();
-
   @Before
-  public void before() throws Exception {
-    this.hostName = getServerHostName(getHost(0));
-
-    this.managerVM = getHost(0).getVM(0);
-    this.serverVM = getHost(0).getVM(1);
-    this.client1VM = getHost(0).getVM(2);
-    this.client2VM = getHost(0).getVM(3);
-
-    addIgnoredException("Connection reset");
-  }
-
-  @After
-  public void after() throws Exception {
-    invokeInEveryVM(() -> {
-      lastKeyReceived = false;
-      clientCache = null;
-    });
+  public void before() {
+    locator =
+        cluster.startLocatorVM(0, r -> r.withoutClusterConfigurationService().withoutHttpService());
+    server = cluster.startServerVM(1, s -> s.withRegion(RegionShortcut.REPLICATE, "regionA")
+        .withConnectionToLocator(locator.getPort()));
   }
 
   @Test
   public void testClientHealthStats_SubscriptionEnabled() throws Exception {
-    this.managementTestRule.createManager(this.managerVM, false);
-    this.managementTestRule.startManager(this.managerVM);
-
-    int port = this.serverVM.invoke(() -> createServerCache());
-
-    this.client1VM.invoke(() -> createClientCache(this.hostName, port, 1, true));
-    this.client2VM.invoke(() -> createClientCache(this.hostName, port, 2, true));
-
-    DistributedMember serverMember = this.managementTestRule.getDistributedMember(this.serverVM);
-    this.managerVM.invoke(() -> verifyClientStats(serverMember, port, 2));
-    this.managementTestRule.stopManager(this.managerVM);
+    client1 = cluster.startClientVM(2, true, server.getPort());
+    client2 = cluster.startClientVM(3, true, server.getPort());
+
+    VMProvider.invokeInEveryMember(() -> {
+      ClientRegionFactory<String, String> regionFactory =
+          ClusterStartupRule.getClientCache()
+              .createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
+      Region<String, String> region = regionFactory.create("regionA");
+      // need to do some operation in order for the clients to be registered in mBean
+      region.put("1", "1");
+    }, client1, client2);
+
+    locator.waitTillClientsAreReadyOnServers(server.getName(), server.getPort(), 2);
+    verifyClientsAndSubscription(2);
   }
 
   @Test
   public void testClientHealthStats_SubscriptionDisabled() throws Exception {
-    this.managementTestRule.createManager(this.managerVM, false);
-    this.managementTestRule.startManager(this.managerVM);
-
-    int port = this.serverVM.invoke(() -> createServerCache());
-
-    this.client1VM.invoke(() -> createClientCache(this.hostName, port, 1, false));
-    this.client2VM.invoke(() -> createClientCache(this.hostName, port, 2, false));
-
-    DistributedMember serverMember = this.managementTestRule.getDistributedMember(this.serverVM);
-    this.managerVM.invoke(() -> verifyClientStats(serverMember, port, 0));
-    this.managementTestRule.stopManager(this.managerVM);
+    client1 = cluster.startClientVM(2, false, server.getPort());
+    client2 = cluster.startClientVM(3, false, server.getPort());
+    VMProvider.invokeInEveryMember(() -> {
+      ClientRegionFactory<String, String> regionFactory =
+          ClusterStartupRule.getClientCache()
+              .createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
+      regionFactory.create("regionA");
+    }, client1, client2);
+
+    locator.waitTillClientsAreReadyOnServers(server.getName(), server.getPort(), 2);
+    verifyClientsAndSubscription(0);
   }
 
   @Test
   public void testClientHealthStats_DurableClient() throws Exception {
-    this.managementTestRule.createManager(this.managerVM, false);
-    this.managementTestRule.startManager(this.managerVM);
-
-    int port = this.serverVM.invoke(() -> createServerCache());
-
-    this.client1VM.invoke(() -> createClientCache(this.hostName, port, 1, true));
-    this.client2VM.invoke(() -> createClientCache(this.hostName, port, 2, true));
+    client1 = createDurableClient(2);
+    client2 = createDurableClient(3);
 
-    this.client1VM.invoke(() -> clientCache.close(true));
-    this.client2VM.invoke(() -> clientCache.close(true));
-
-    DistributedMember serverMember = this.managementTestRule.getDistributedMember(this.serverVM);
-    this.managerVM.invoke(() -> verifyClientStats(serverMember, port, 2));
-    this.managementTestRule.stopManager(this.managerVM);
+    locator.waitTillClientsAreReadyOnServers(server.getName(), server.getPort(), 2);
+    VMProvider.invokeInEveryMember(() -> ClusterStartupRule.getClientCache().close(true),
client1,
+        client2);
+    verifyClientsAndSubscription(2);
   }
 
   @Test
   public void testStatsMatchWithSize() throws Exception {
-    // start a serverVM
-    int port = this.serverVM.invoke(() -> createServerCache());
+    // create durable client, with durable RI
+    client1 = createDurableClient(2);
 
-    // create durable client1VM, with durable RI
-    this.client1VM.invoke(() -> createClientCache(this.hostName, port, 1, true));
+    // do puts in server
+    server.invoke(() -> {
+      Region<String, String> region = ClusterStartupRule.getCache().getRegion("/regionA");
 
-    // do puts on serverVM from three different threads, pause after 500 puts each.
-    this.serverVM.invoke(() -> doPuts());
+      Thread thread1 = new Thread(() -> {
+        for (int i = 0; i < NUMBER_PUTS; i++) {
+          region.put("T1_KEY_" + i, "VALUE_" + i);
+        }
+      });
+      Thread thread2 = new Thread(() -> {
+        for (int i = 0; i < NUMBER_PUTS; i++) {
+          region.put("T2_KEY_" + i, "VALUE_" + i);
+        }
+      });
+      Thread thread3 = new Thread(() -> {
+        for (int i = 0; i < NUMBER_PUTS; i++) {
+          region.put("T3_KEY_" + i, "VALUE_" + i);
+        }
+      });
 
-    // close durable client1VM
-    this.client1VM.invoke(() -> clientCache.close(true));
+      thread1.start();
+      thread2.start();
+      thread3.start();
 
-    this.serverVM.invoke(() -> await().until(() -> cacheClientProxyHasBeenPause()));
+      thread1.join();
+      thread2.join();
+      thread3.join();
+    });
 
-    // resume puts on serverVM, add another 100.
-    this.serverVM.invoke(() -> resumePuts());
+    // close durable client1
+    client1.invoke(() -> ClusterStartupRule.getClientCache().close(true));
+    server.waitTillCacheClientProxyHasBeenPaused();
 
-    // start durable client1VM
-    this.client1VM.invoke(() -> createClientCache(this.hostName, port, 1, true));
+    // resume puts on serverVM, add another 100.
+    server.invoke(() -> {
+      Region<String, String> region = ClusterStartupRule.getCache().getRegion("/regionA");
+      for (int i = 0; i < NUMBER_PUTS; i++) {
+        region.put("NEWKEY_" + i, "NEWVALUE_" + i);
+      }
+      region.put("last_key", "last_value");
+    });
 
+    // start durable client1 again
+    client1 = createDurableClient(2);
     // wait for full queue dispatch
-    this.client1VM.invoke(() -> await().until(() -> lastKeyReceived));
+    client1.invoke(() -> Awaitility.await().until(() -> lastKeyReceived));
 
     // verify the stats
-    this.serverVM.invoke(() -> verifyStats(port));
-  }
-
-  /**
-   * Invoked in serverVM
-   */
-  private boolean cacheClientProxyHasBeenPause() {
-    CacheClientNotifier clientNotifier = CacheClientNotifier.getInstance();
-    Collection<CacheClientProxy> clientProxies = clientNotifier.getClientProxies();
+    server.invoke(() -> {
+      ManagementService service = ClusterStartupRule.memberStarter.getManagementService();
+      CacheServerMXBean cacheServerMXBean = service.getLocalCacheServerMXBean(server.getPort());
 
-    for (CacheClientProxy clientProxy : clientProxies) {
-      if (clientProxy.isPaused()) {
-        return true;
-      }
-    }
-    return false;
-  }
+      CacheClientNotifier clientNotifier = CacheClientNotifier.getInstance();
+      CacheClientProxy clientProxy = clientNotifier.getClientProxies().iterator().next();
+      assertThat(clientProxy.getQueueSizeStat()).isEqualTo(clientProxy.getQueueSize());
 
-  /**
-   * Invoked in serverVM
-   */
-  private int createServerCache() throws IOException {
-    Cache cache = this.managementTestRule.getCache();
-
-    RegionFactory<String, String> regionFactory =
-        cache.createRegionFactory(RegionShortcut.REPLICATE);
-    regionFactory.setConcurrencyChecksEnabled(false);
-    regionFactory.create(REGION_NAME);
-
-    CacheServer cacheServer = cache.addCacheServer();
-    cacheServer.setPort(0);
-    cacheServer.start();
-    return cacheServer.getPort();
+      ClientQueueDetail queueDetails = cacheServerMXBean.showClientQueueDetails()[0];
+      assertThat((int) queueDetails.getQueueSize()).isEqualTo(clientProxy.getQueueSizeStat());
+    });
   }
 
-  /**
-   * Invoked in client1VM and client2VM
-   */
-  private void createClientCache(final String hostName, final Integer port, final int clientNum,
-      final boolean subscriptionEnabled) {
-    Properties props = new Properties();
-    props.setProperty(STATISTIC_SAMPLING_ENABLED, "true");
-
-    ClientCacheFactory cacheFactory = new ClientCacheFactory(props);
-    if (subscriptionEnabled) {
-      cacheFactory.setPoolSubscriptionEnabled(true);
-      cacheFactory.setPoolSubscriptionAckInterval(50);
-      cacheFactory.setPoolSubscriptionRedundancy(0);
-    }
-
-    cacheFactory.set(DURABLE_CLIENT_ID, "DurableClientId_" + clientNum);
-    cacheFactory.set(DURABLE_CLIENT_TIMEOUT, "" + 30000);
-
-    cacheFactory.addPoolServer(hostName, port);
-    clientCache = cacheFactory.create();
-
-    ClientRegionFactory<String, String> regionFactory =
-        clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
-    regionFactory.setConcurrencyChecksEnabled(false);
-
-    regionFactory.addCacheListener(new CacheListenerAdapter<String, String>() {
-      @Override
-      public void afterCreate(final EntryEvent<String, String> event) {
-        if ("last_key".equals(event.getKey())) {
-          lastKeyReceived = true;
-        }
-      }
+  private ClientVM createDurableClient(int index) throws Exception {
+    ClientVM client = cluster.startClientVM(index, ccf -> {
+      ccf.setPoolSubscriptionEnabled(true);
+      ccf.addPoolServer("localhost", server.getPort());
+      ccf.set(DURABLE_CLIENT_ID, "client" + index);
+      ccf.set(DURABLE_CLIENT_TIMEOUT, "" + 30000);
     });
 
-    Region<String, String> region = regionFactory.create(REGION_NAME);
-    if (subscriptionEnabled) {
+    client.invoke(() -> {
+      ClientRegionFactory<String, String> regionFactory = ClusterStartupRule.getClientCache()
+          .createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
+      regionFactory.setConcurrencyChecksEnabled(false);
+
+      regionFactory.addCacheListener(new CacheListenerAdapter<String, String>() {
+        @Override
+        public void afterCreate(final EntryEvent<String, String> event) {
+          if ("last_key".equals(event.getKey())) {
+            lastKeyReceived = true;
+          }
+        }
+      });
+      Region<String, String> region = regionFactory.create("regionA");
       region.registerInterest("ALL_KEYS", true);
-      clientCache.readyForEvents();
-    }
-  }
-
-  /**
-   * Invoked in serverVM
-   */
-  private void doPuts() throws InterruptedException {
-    Cache cache = this.managementTestRule.getCache();
-    Region<String, String> region = cache.getRegion(Region.SEPARATOR + REGION_NAME);
-
-    Thread thread1 = new Thread(() -> {
-      for (int i = 0; i < NUMBER_PUTS; i++) {
-        region.put("T1_KEY_" + i, "VALUE_" + i);
-      }
-    });
-    Thread thread2 = new Thread(() -> {
-      for (int i = 0; i < NUMBER_PUTS; i++) {
-        region.put("T2_KEY_" + i, "VALUE_" + i);
-      }
-    });
-    Thread thread3 = new Thread(() -> {
-      for (int i = 0; i < NUMBER_PUTS; i++) {
-        region.put("T3_KEY_" + i, "VALUE_" + i);
-      }
+      ClusterStartupRule.getClientCache().readyForEvents();
     });
-
-    thread1.start();
-    thread2.start();
-    thread3.start();
-
-    thread1.join();
-    thread2.join();
-    thread3.join();
+    return client;
   }
 
-  /**
-   * Invoked in serverVM
-   */
-  private void resumePuts() {
-    Cache cache = this.managementTestRule.getCache();
-    Region<String, String> region = cache.getRegion(Region.SEPARATOR + REGION_NAME);
-
-    for (int i = 0; i < NUMBER_PUTS; i++) {
-      region.put("NEWKEY_" + i, "NEWVALUE_" + i);
-    }
-    region.put("last_key", "last_value");
-  }
-
-  /**
-   * Invoked in managerVM
-   */
-  private void verifyClientStats(final DistributedMember serverMember, final int serverPort,
-      final int numSubscriptions) throws Exception {
-    ManagementService service = this.managementTestRule.getManagementService();
-    CacheServerMXBean cacheServerMXBean = awaitCacheServerMXBean(serverMember, serverPort);
-
-    await().until(() -> cacheServerMXBean.getClientIds().length == 2);
-    String[] clientIds = cacheServerMXBean.getClientIds();
+  private void verifyClientsAndSubscription(int subscriptionCount) {
+    locator.invoke(() -> {
+      CacheServerMXBean bean =
+          ClusterStartupRule.memberStarter.getCacheServerMXBean(server.getName(), server.getPort());
+      String[] clientIds = bean.getClientIds();
 
-    ClientHealthStatus[] clientStatuses = cacheServerMXBean.showAllClientStats();
-
-    ClientHealthStatus clientStatus1 = cacheServerMXBean.showClientStats(clientIds[0]);
-    ClientHealthStatus clientStatus2 = cacheServerMXBean.showClientStats(clientIds[1]);
-    assertThat(clientStatus1).isNotNull();
-    assertThat(clientStatus2).isNotNull();
-
-    assertThat(clientStatuses).isNotNull().hasSize(2);
-
-    DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean();
-    assertThat(dsBean.getNumClients()).isEqualTo(2);
-    assertThat(dsBean.getNumSubscriptions()).isEqualTo(numSubscriptions);
-  }
+      ClientHealthStatus[] clientStatuses = bean.showAllClientStats();
+      assertThat(clientStatuses).isNotNull().hasSize(2);
 
-  /**
-   * Invoked in serverVM
-   */
-  private void verifyStats(final int serverPort) throws Exception {
-    ManagementService service = this.managementTestRule.getManagementService();
-    CacheServerMXBean cacheServerMXBean = service.getLocalCacheServerMXBean(serverPort);
+      ClientHealthStatus clientStatus1 = bean.showClientStats(clientIds[0]);
+      ClientHealthStatus clientStatus2 = bean.showClientStats(clientIds[1]);
+      assertThat(clientStatus1).isNotNull();
+      assertThat(clientStatus2).isNotNull();
 
-    CacheClientNotifier clientNotifier = CacheClientNotifier.getInstance();
-    CacheClientProxy clientProxy = clientNotifier.getClientProxies().iterator().next();
-    assertThat(clientProxy.getQueueSizeStat()).isEqualTo(clientProxy.getQueueSize());
-
-    ClientQueueDetail queueDetails = cacheServerMXBean.showClientQueueDetails()[0];
-    assertThat((int) queueDetails.getQueueSize()).isEqualTo(clientProxy.getQueueSizeStat());
-  }
-
-  private CacheServerMXBean awaitCacheServerMXBean(final DistributedMember serverMember,
-      final int port) {
-    SystemManagementService service = this.managementTestRule.getSystemManagementService();
-    ObjectName objectName = service.getCacheServerMBeanName(port, serverMember);
-
-    await().until(
-        () -> assertThat(service.getMBeanProxy(objectName, CacheServerMXBean.class)).isNotNull());
-
-    return service.getMBeanProxy(objectName, CacheServerMXBean.class);
-  }
-
-  private ConditionFactory await() {
-    return Awaitility.await().atMost(2, MINUTES);
+      DistributedSystemMXBean dsBean =
+          ClusterStartupRule.memberStarter.getManagementService().getDistributedSystemMXBean();
+      assertThat(dsBean.getNumClients()).isEqualTo(2);
+      assertThat(dsBean.getNumSubscriptions()).isEqualTo(subscriptionCount);
+    });
   }
 }
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/test/dunit/rules/ClusterStartupRule.java
b/geode-core/src/distributedTest/java/org/apache/geode/test/dunit/rules/ClusterStartupRule.java
index e5fc1be..234d90e 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/test/dunit/rules/ClusterStartupRule.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/test/dunit/rules/ClusterStartupRule.java
@@ -284,25 +284,23 @@ public class ClusterStartupRule extends ExternalResource implements
Serializable
     props.setProperty(UserPasswordAuthInit.PASSWORD, password);
     props.setProperty(SECURITY_CLIENT_AUTH_INIT, UserPasswordAuthInit.class.getName());
 
-    SerializableConsumerIF<ClientCacheFactory> consumer = ((cacheFactory) -> {
-      cacheFactory.setPoolSubscriptionEnabled(subscriptionEnabled);
+    return startClientVM(index, props, (ccf) -> {
+      ccf.setPoolSubscriptionEnabled(subscriptionEnabled);
       for (int serverPort : serverPorts) {
-        cacheFactory.addPoolServer("localhost", serverPort);
+        ccf.addPoolServer("localhost", serverPort);
       }
     });
-    return startClientVM(index, props, consumer);
   }
 
   // convenient startClientMethod
   public ClientVM startClientVM(int index, boolean subscriptionEnabled, int... serverPorts)
       throws Exception {
-    SerializableConsumerIF<ClientCacheFactory> consumer = ((cacheFactory) -> {
-      cacheFactory.setPoolSubscriptionEnabled(subscriptionEnabled);
-      for (int serverPort : serverPorts) {
-        cacheFactory.addPoolServer("localhost", serverPort);
+    return startClientVM(index, ccf -> {
+      ccf.setPoolSubscriptionEnabled(subscriptionEnabled);
+      for (int port : serverPorts) {
+        ccf.addPoolServer("localhost", port);
       }
     });
-    return startClientVM(index, consumer);
   }
 
   /**
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/test/dunit/rules/MemberVM.java
b/geode-core/src/distributedTest/java/org/apache/geode/test/dunit/rules/MemberVM.java
index d7d4eae..1fe04c8 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/test/dunit/rules/MemberVM.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/test/dunit/rules/MemberVM.java
@@ -143,6 +143,9 @@ public class MemberVM extends VMProvider implements Member {
   }
 
 
+  /**
+   * this can only be called on a locator (or a vm that is not that serverName)
+   */
   public void waitTillClientsAreReadyOnServers(String serverName, int serverPort, int clientCount)
{
     vm.invoke(() -> ClusterStartupRule.memberStarter.waitTillClientsAreReadyOnServer(serverName,
         serverPort, clientCount));
@@ -166,4 +169,8 @@ public class MemberVM extends VMProvider implements Member {
         .waitUntilGatewaySendersAreReadyOnExactlyThisManyServers(expectedGatewayObjectCount));
   }
 
+  public void waitTillCacheClientProxyHasBeenPaused() {
+    vm.invoke(() -> ClusterStartupRule.memberStarter.waitTillCacheClientProxyHasBeenPaused());
+  }
+
 }
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/MemberStarterRuleTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/MemberStarterRuleTest.java
index 018c5e4..05853ae 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/MemberStarterRuleTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/MemberStarterRuleTest.java
@@ -110,4 +110,13 @@ public class MemberStarterRuleTest {
 
     assertThat(locator.getWorkingDir()).isNotNull();
   }
+
+  @Test
+  public void httpPort() {
+    locator = new LocatorStarterRule().withoutHttpService();
+    locator.before();
+
+    assertThat(locator.getHttpPort()).isEqualTo(0);
+    assertThat(locator.getJmxPort()).isGreaterThan(0);
+  }
 }
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/test/junit/rules/LocatorStarterRule.java
b/geode-core/src/integrationTest/java/org/apache/geode/test/junit/rules/LocatorStarterRule.java
index 10e9a7a..08e6fa4 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/test/junit/rules/LocatorStarterRule.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/test/junit/rules/LocatorStarterRule.java
@@ -14,6 +14,8 @@
  */
 package org.apache.geode.test.junit.rules;
 
+import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
+import static org.apache.geode.distributed.ConfigurationProperties.HTTP_SERVICE_PORT;
 import static org.apache.geode.distributed.Locator.startLocatorAndDS;
 import static org.junit.Assert.assertTrue;
 
@@ -42,6 +44,12 @@ import org.apache.geode.internal.cache.InternalCache;
  * information, working dir, name, and the InternalLocator it creates.
  *
  * <p>
+ * by default the rule starts a locator with jmx and http service and cluster configuration
service
+ * you can turn off the http service and cluster configuration service to have your test
+ * run faster if your test does not need them.
+ * </p>
+ *
+ * <p>
  * If you need a rule to start a server/locator in different VMs for Distributed tests, You
should
  * use {@code LocatorServerStartupRule}.
  */
@@ -72,8 +80,6 @@ public class LocatorStarterRule extends MemberStarterRule<LocatorStarterRule>
im
     }
   }
 
-
-
   public void startLocator() {
     try {
       // this will start a jmx manager and admin rest service by default
@@ -95,6 +101,16 @@ public class LocatorStarterRule extends MemberStarterRule<LocatorStarterRule>
im
     }
   }
 
+  public LocatorStarterRule withoutHttpService() {
+    properties.put(HTTP_SERVICE_PORT, "0");
+    return this;
+  }
+
+  public LocatorStarterRule withoutClusterConfigurationService() {
+    properties.put(ENABLE_CLUSTER_CONFIGURATION, "false");
+    return this;
+  }
+
   @Override
   public InternalCache getCache() {
     return locator.getCache();
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
b/geode-core/src/integrationTest/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
index 11b4904..953429d 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
@@ -35,6 +35,7 @@ import java.io.File;
 import java.io.IOException;
 import java.text.MessageFormat;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
 import java.util.Properties;
@@ -56,6 +57,8 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.membership.gms.MembershipManagerHelper;
 import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
+import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
 import org.apache.geode.internal.net.SocketCreatorFactory;
 import org.apache.geode.management.CacheServerMXBean;
 import org.apache.geode.management.DistributedRegionMXBean;
@@ -192,8 +195,11 @@ public abstract class MemberStarterRule<T> extends SerializableExternalResource
   }
 
   public T withName(String name) {
-    this.name = name;
-    properties.putIfAbsent(NAME, name);
+    // only if name is not defined yet
+    if (!properties.containsKey(NAME)) {
+      this.name = name;
+      properties.putIfAbsent(NAME, name);
+    }
     return (T) this;
   }
 
@@ -316,6 +322,23 @@ public abstract class MemberStarterRule<T> extends SerializableExternalResource
     await().atMost(1, TimeUnit.MINUTES).until(() -> bean.getClientIds().length == clientCount);
   }
 
+  /**
+   * Invoked in serverVM
+   */
+  public void waitTillCacheClientProxyHasBeenPaused() {
+    await().until(() -> {
+      CacheClientNotifier clientNotifier = CacheClientNotifier.getInstance();
+      Collection<CacheClientProxy> clientProxies = clientNotifier.getClientProxies();
+
+      for (CacheClientProxy clientProxy : clientProxies) {
+        if (clientProxy.isPaused()) {
+          return true;
+        }
+      }
+      return false;
+    });
+  }
+
   public void waitTillCacheServerIsReady(String serverName, int serverPort) {
     await().atMost(1, TimeUnit.MINUTES)
         .until(() -> getCacheServerMXBean(serverName, serverPort) != null);


Mime
View raw message