geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [10/11] geode git commit: Convert from ManagementTestCase to ManagementTestRule
Date Mon, 06 Feb 2017 23:26:30 GMT
http://git-wip-us.apache.org/repos/asf/geode/blob/8f48a585/geode-core/src/test/java/org/apache/geode/management/ClientHealthStatsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/management/ClientHealthStatsDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/ClientHealthStatsDUnitTest.java
index a3c8b27..5142571 100644
--- a/geode-core/src/test/java/org/apache/geode/management/ClientHealthStatsDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/ClientHealthStatsDUnitTest.java
@@ -14,13 +14,26 @@
  */
 package org.apache.geode.management;
 
+import static java.util.concurrent.TimeUnit.*;
 import static org.apache.geode.distributed.ConfigurationProperties.*;
-import static org.apache.geode.test.dunit.Assert.*;
-
+import static org.apache.geode.test.dunit.Host.*;
+import static org.apache.geode.test.dunit.IgnoredException.*;
+import static org.apache.geode.test.dunit.Invoke.*;
+import static org.apache.geode.test.dunit.NetworkUtils.*;
+import static org.assertj.core.api.Assertions.*;
+
+import java.io.IOException;
+import java.io.Serializable;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.Properties;
 
+import javax.management.ObjectName;
+
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -29,438 +42,373 @@ import org.apache.geode.cache.EntryEvent;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionFactory;
 import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
 import org.apache.geode.cache.client.ClientCacheFactory;
 import org.apache.geode.cache.client.ClientRegionFactory;
 import org.apache.geode.cache.client.ClientRegionShortcut;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.cache.util.CacheListenerAdapter;
 import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
-import org.apache.geode.internal.i18n.LocalizedStrings;
-import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.management.internal.SystemManagementService;
 import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
 import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.categories.FlakyTest;
 
 /**
  * Client health stats check
  */
 @Category(DistributedTest.class)
-@SuppressWarnings("serial")
-public class ClientHealthStatsDUnitTest extends JUnit4DistributedTestCase {
+@SuppressWarnings({"serial", "unused"})
+public class ClientHealthStatsDUnitTest implements Serializable {
 
-  private static final String k1 = "k1";
-  private static final String k2 = "k2";
-  private static final String client_k1 = "client-k1";
-  private static final String client_k2 = "client-k2";
+  private static final int NUMBER_PUTS = 100;
 
-  /** name of the test region */
-  private static final String REGION_NAME = "ClientHealthStatsDUnitTest_Region";
+  private static final String KEY1 = "KEY1";
+  private static final String KEY2 = "KEY2";
+  private static final String VALUE1 = "VALUE1";
+  private static final String VALUE2 = "VALUE2";
 
-  private static VM client = null;
-  private static VM client2 = null;
-  private static VM managingNode = null;
+  private static final String REGION_NAME =
+      ClientHealthStatsDUnitTest.class.getSimpleName() + "_Region";
 
-  private static ManagementTestBase helper = new ManagementTestBase() {};
+  // client1VM and client2VM VM fields
+  private static ClientCache clientCache;
 
-  private static int numOfCreates = 0;
-  private static int numOfUpdates = 0;
-  private static int numOfInvalidates = 0;
-  private static boolean lastKeyReceived = false;
+  // TODO: assert following values in each client VM
+  private static int numOfCreates;
+  private static int numOfUpdates;
+  private static int numOfInvalidates;
+  private static boolean lastKeyReceived;
 
-  private static GemFireCacheImpl cache = null;
+  private VM managerVM;
+  private VM serverVM;
+  private VM client1VM;
+  private VM client2VM;
 
-  private VM server = null;
+  private String hostName;
 
-  @Override
-  public final void postSetUp() throws Exception {
-    disconnectAllFromDS();
+  @Rule
+  public ManagementTestRule managementTestRule = ManagementTestRule.builder().build();
 
-    final Host host = Host.getHost(0);
-    managingNode = host.getVM(0);
-    server = host.getVM(1);
-    client = host.getVM(2);
-    client2 = host.getVM(3);
-
-    IgnoredException.addIgnoredException("Connection reset");
-  }
+  @Before
+  public void before() throws Exception {
+    this.hostName = getServerHostName(getHost(0));
 
-  @Override
-  public final void preTearDown() throws Exception {
-    reset();
-    helper.closeCache(managingNode);
-    helper.closeCache(client);
-    helper.closeCache(client2);
-    helper.closeCache(server);
+    this.managerVM = getHost(0).getVM(0);
+    this.serverVM = getHost(0).getVM(1);
+    this.client1VM = getHost(0).getVM(2);
+    this.client2VM = getHost(0).getVM(3);
 
-    disconnectAllFromDS();
+    addIgnoredException("Connection reset");
   }
 
-  private static void reset() throws Exception {
-    lastKeyReceived = false;
-    numOfCreates = 0;
-    numOfUpdates = 0;
-    numOfInvalidates = 0;
+  @After
+  public void after() throws Exception {
+    invokeInEveryVM(() -> {
+      lastKeyReceived = false;
+      numOfCreates = 0;
+      numOfUpdates = 0;
+      numOfInvalidates = 0;
+      clientCache = null;
+    });
   }
 
   @Test
   public void testClientHealthStats_SubscriptionEnabled() throws Exception {
-    helper.createManagementCache(managingNode);
-    helper.startManagingNode(managingNode);
-
-    int port = (Integer) server.invoke(() -> ClientHealthStatsDUnitTest.createServerCache());
+    this.managementTestRule.createManager(this.managerVM, false);
+    this.managementTestRule.startManager(this.managerVM);
 
-    DistributedMember serverMember = helper.getMember(server);
+    int port = this.serverVM.invoke(() -> createServerCache());
 
-    client.invoke(
-        () -> ClientHealthStatsDUnitTest.createClientCache(server.getHost(), port, 1, true, false));
+    this.client1VM.invoke(() -> createClientCache(this.hostName, port, 1, true));
+    this.client2VM.invoke(() -> createClientCache(this.hostName, port, 2, true));
 
-    client2.invoke(
-        () -> ClientHealthStatsDUnitTest.createClientCache(server.getHost(), port, 2, true, false));
+    this.client1VM.invoke(() -> put());
+    this.client2VM.invoke(() -> put());
 
-    client.invoke(() -> ClientHealthStatsDUnitTest.put());
-    client2.invoke(() -> ClientHealthStatsDUnitTest.put());
+    DistributedMember serverMember = this.managementTestRule.getDistributedMember(this.serverVM);
+    this.managerVM.invoke(() -> verifyClientStats(serverMember, port, 2));
 
-    managingNode.invoke(() -> ClientHealthStatsDUnitTest.verifyClientStats(serverMember, port, 2));
-    helper.stopManagingNode(managingNode);
+    this.managementTestRule.stopManager(this.managerVM);
   }
 
   @Test
   public void testClientHealthStats_SubscriptionDisabled() throws Exception {
-    helper.createManagementCache(managingNode);
-    helper.startManagingNode(managingNode);
-
-    int port = (Integer) server.invoke(() -> ClientHealthStatsDUnitTest.createServerCache());
-
-    DistributedMember serverMember = helper.getMember(server);
+    this.managementTestRule.createManager(this.managerVM, false);
+    this.managementTestRule.startManager(this.managerVM);
 
-    client.invoke(() -> ClientHealthStatsDUnitTest.createClientCache(server.getHost(), port, 1,
-        false, false));
+    int port = this.serverVM.invoke(() -> createServerCache());
 
-    client2.invoke(() -> ClientHealthStatsDUnitTest.createClientCache(server.getHost(), port, 2,
-        false, false));
+    this.client1VM.invoke(() -> createClientCache(this.hostName, port, 1, false));
+    this.client2VM.invoke(() -> createClientCache(this.hostName, port, 2, false));
 
-    client.invoke(() -> ClientHealthStatsDUnitTest.put());
-    client2.invoke(() -> ClientHealthStatsDUnitTest.put());
+    this.client1VM.invoke(() -> put());
+    this.client2VM.invoke(() -> put());
 
-    managingNode.invoke(() -> ClientHealthStatsDUnitTest.verifyClientStats(serverMember, port, 0));
-    helper.stopManagingNode(managingNode);
+    DistributedMember serverMember = this.managementTestRule.getDistributedMember(this.serverVM);
+    this.managerVM.invoke(() -> verifyClientStats(serverMember, port, 0));
+    this.managementTestRule.stopManager(this.managerVM);
   }
 
   @Test
   public void testClientHealthStats_DurableClient() throws Exception {
-    helper.createManagementCache(managingNode);
-    helper.startManagingNode(managingNode);
+    this.managementTestRule.createManager(this.managerVM, false);
+    this.managementTestRule.startManager(this.managerVM);
 
-    int port = (Integer) server.invoke(() -> ClientHealthStatsDUnitTest.createServerCache());
+    int port = this.serverVM.invoke(() -> createServerCache());
 
-    DistributedMember serverMember = helper.getMember(server);
+    this.client1VM.invoke(() -> createClientCache(this.hostName, port, 1, true));
+    this.client2VM.invoke(() -> createClientCache(this.hostName, port, 2, true));
 
-    client.invoke(
-        () -> ClientHealthStatsDUnitTest.createClientCache(server.getHost(), port, 1, true, true));
+    this.client1VM.invoke(() -> put());
+    this.client2VM.invoke(() -> put());
 
-    client2.invoke(
-        () -> ClientHealthStatsDUnitTest.createClientCache(server.getHost(), port, 2, true, true));
+    this.client1VM.invoke(() -> clientCache.close(true));
+    this.client2VM.invoke(() -> clientCache.close(true));
 
-    client.invoke(() -> ClientHealthStatsDUnitTest.put());
-    client2.invoke(() -> ClientHealthStatsDUnitTest.put());
-
-    client.invoke(() -> ClientHealthStatsDUnitTest.closeClientCache());
-
-    client2.invoke(() -> ClientHealthStatsDUnitTest.closeClientCache());
-
-    managingNode.invoke(() -> ClientHealthStatsDUnitTest.verifyClientStats(serverMember, port, 2));
-    helper.stopManagingNode(managingNode);
+    DistributedMember serverMember = this.managementTestRule.getDistributedMember(this.serverVM);
+    this.managerVM.invoke(() -> verifyClientStats(serverMember, port, 2));
+    this.managementTestRule.stopManager(this.managerVM);
   }
 
-  @Category(FlakyTest.class) // GEODE-337
   @Test
   public void testStatsMatchWithSize() throws Exception {
-    // start a server
-    int port = (Integer) server.invoke(() -> ClientHealthStatsDUnitTest.createServerCache());
-    // create durable client, with durable RI
-    client.invoke(
-        () -> ClientHealthStatsDUnitTest.createClientCache(server.getHost(), port, 1, true, false));
-    // do puts on server from three different threads, pause after 500 puts each.
-    server.invoke(() -> ClientHealthStatsDUnitTest.doPuts());
-    // close durable client
-    client.invoke(() -> ClientHealthStatsDUnitTest.closeClientCache());
-
-    server.invoke("verifyProxyHasBeenPaused", () -> verifyProxyHasBeenPaused());
-    // resume puts on server, add another 100.
-    server.invokeAsync(() -> ClientHealthStatsDUnitTest.resumePuts());
-    // start durable client
-    client.invoke(
-        () -> ClientHealthStatsDUnitTest.createClientCache(server.getHost(), port, 1, true, false));
-    // wait for full queue dispatch
-    client.invoke(() -> ClientHealthStatsDUnitTest.waitForLastKey());
-    // verify the stats
-    server.invoke(() -> ClientHealthStatsDUnitTest.verifyStats(port));
-  }
+    // start a serverVM
+    int port = this.serverVM.invoke(() -> createServerCache());
 
-  private static void verifyProxyHasBeenPaused() {
+    // create durable client1VM, with durable RI
+    this.client1VM.invoke(() -> createClientCache(this.hostName, port, 1, true));
 
-    WaitCriterion criterion = new WaitCriterion() {
+    // do puts on serverVM from three different threads, pause after 500 puts each.
+    this.serverVM.invoke(() -> doPuts());
 
-      @Override
-      public boolean done() {
-        CacheClientNotifier ccn = CacheClientNotifier.getInstance();
-        Collection<CacheClientProxy> ccProxies = ccn.getClientProxies();
+    // close durable client1VM
+    this.client1VM.invoke(() -> clientCache.close(true));
 
-        Iterator<CacheClientProxy> itr = ccProxies.iterator();
+    this.serverVM.invoke(() -> await().until(() -> cacheClientProxyHasBeenPause()));
 
-        while (itr.hasNext()) {
-          CacheClientProxy ccp = itr.next();
-          System.out.println("proxy status " + ccp.getState());
-          if (ccp.isPaused())
-            return true;
-        }
-        return false;
-      }
+    // resume puts on serverVM, add another 100.
+    this.serverVM.invoke(() -> resumePuts());
 
-      @Override
-      public String description() {
-        return "Proxy has not paused yet";
-      }
-    };
+    // start durable client1VM
+    this.client1VM.invoke(() -> createClientCache(this.hostName, port, 1, true));
 
-    Wait.waitForCriterion(criterion, 15 * 1000, 200, true);
+    // wait for full queue dispatch
+    this.client1VM.invoke(() -> await().until(() -> lastKeyReceived));
+
+    // verify the stats
+    this.serverVM.invoke(() -> verifyStats(port));
   }
 
-  private static int createServerCache() throws Exception {
-    Cache cache = helper.createCache(false);
+  /**
+   * Invoked in serverVM
+   */
+  private boolean cacheClientProxyHasBeenPause() {
+    CacheClientNotifier clientNotifier = CacheClientNotifier.getInstance();
+    // TODO: CacheClientNotifier clientNotifier =
+    // ((CacheServerImpl)this.managementTestRule.getCache().getCacheServers().get(0)).getAcceptor().getCacheClientNotifier();
 
-    RegionFactory<String, String> rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
-    rf.setConcurrencyChecksEnabled(false);
-    rf.create(REGION_NAME);
+    Collection<CacheClientProxy> clientProxies = clientNotifier.getClientProxies();
 
-    CacheServer server1 = cache.addCacheServer();
-    server1.setPort(0);
-    server1.start();
-    return server1.getPort();
+    for (CacheClientProxy clientProxy : clientProxies) {
+      if (clientProxy.isPaused()) {
+        return true;
+      }
+    }
+    return false;
   }
 
-  private static void closeClientCache() throws Exception {
-    cache.close(true);
+  /**
+   * Invoked in serverVM
+   */
+  private int createServerCache() throws IOException {
+    Cache cache = this.managementTestRule.getCache();
+
+    RegionFactory<String, String> regionFactory =
+        cache.createRegionFactory(RegionShortcut.REPLICATE);
+    regionFactory.setConcurrencyChecksEnabled(false);
+    regionFactory.create(REGION_NAME);
+
+    CacheServer cacheServer = cache.addCacheServer();
+    cacheServer.setPort(0);
+    cacheServer.start();
+    return cacheServer.getPort();
   }
 
-  private static void createClientCache(Host host, Integer port, int clientNum,
-      boolean subscriptionEnabled, boolean durable) throws Exception {
+  /**
+   * Invoked in client1VM and client2VM
+   */
+  private void createClientCache(final String hostName, final Integer port, final int clientNum,
+      final boolean subscriptionEnabled) {
     Properties props = new Properties();
-    props.setProperty(DURABLE_CLIENT_ID, "durable-" + clientNum);
-    props.setProperty(DURABLE_CLIENT_TIMEOUT, "300000");
-    props.setProperty(LOG_LEVEL, "info");
-    props.setProperty(STATISTIC_ARCHIVE_FILE,
-        getTestMethodName() + "_client_" + clientNum + ".gfs");
     props.setProperty(STATISTIC_SAMPLING_ENABLED, "true");
 
-    ClientCacheFactory ccf = new ClientCacheFactory(props);
+    ClientCacheFactory cacheFactory = new ClientCacheFactory(props);
     if (subscriptionEnabled) {
-      ccf.setPoolSubscriptionEnabled(true);
-      ccf.setPoolSubscriptionAckInterval(50);
-      ccf.setPoolSubscriptionRedundancy(0);
+      cacheFactory.setPoolSubscriptionEnabled(true);
+      cacheFactory.setPoolSubscriptionAckInterval(50);
+      cacheFactory.setPoolSubscriptionRedundancy(0);
     }
 
-    if (durable) {
-      ccf.set(DURABLE_CLIENT_ID, "DurableClientId_" + clientNum);
-      ccf.set(DURABLE_CLIENT_TIMEOUT, "" + 300);
-    }
+    cacheFactory.set(DURABLE_CLIENT_ID, "DurableClientId_" + clientNum);
+    cacheFactory.set(DURABLE_CLIENT_TIMEOUT, "" + 30000);
 
-    ccf.addPoolServer(host.getHostName(), port);
-    cache = (GemFireCacheImpl) ccf.create();
+    cacheFactory.addPoolServer(hostName, port);
+    clientCache = cacheFactory.create();
 
-    ClientRegionFactory<String, String> crf =
-        cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
-    crf.setConcurrencyChecksEnabled(false);
+    ClientRegionFactory<String, String> regionFactory =
+        clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
+    regionFactory.setConcurrencyChecksEnabled(false);
 
-    crf.addCacheListener(new CacheListenerAdapter<String, String>() {
-      public void afterInvalidate(EntryEvent<String, String> event) {
-        cache.getLoggerI18n()
-            .fine("Invalidate Event: " + event.getKey() + ", " + event.getNewValue());
+    regionFactory.addCacheListener(new CacheListenerAdapter<String, String>() {
+      @Override
+      public void afterInvalidate(final EntryEvent<String, String> event) {
         numOfInvalidates++;
       }
 
-      public void afterCreate(EntryEvent<String, String> event) {
-        if (((String) event.getKey()).equals("last_key")) {
+      @Override
+      public void afterCreate(final EntryEvent<String, String> event) {
+        if ("last_key".equals(event.getKey())) {
           lastKeyReceived = true;
         }
-        cache.getLoggerI18n().fine("Create Event: " + event.getKey() + ", " + event.getNewValue());
         numOfCreates++;
       }
 
-      public void afterUpdate(EntryEvent<String, String> event) {
-        cache.getLoggerI18n().fine("Update Event: " + event.getKey() + ", " + event.getNewValue());
+      @Override
+      public void afterUpdate(final EntryEvent<String, String> event) {
         numOfUpdates++;
       }
     });
 
-    Region<String, String> r = crf.create(REGION_NAME);
+    Region<String, String> region = regionFactory.create(REGION_NAME);
     if (subscriptionEnabled) {
-      r.registerInterest("ALL_KEYS", true);
-      cache.readyForEvents();
+      region.registerInterest("ALL_KEYS", true);
+      clientCache.readyForEvents();
     }
   }
 
-  private static void doPuts() throws Exception {
-    Cache cache = GemFireCacheImpl.getInstance();
-    final Region<String, String> r = cache.getRegion(Region.SEPARATOR + REGION_NAME);
-    Thread t1 = new Thread(new Runnable() {
-      public void run() {
-        for (int i = 0; i < 500; i++) {
-          r.put("T1_KEY_" + i, "VALUE_" + i);
-        }
+  /**
+   * Invoked in serverVM
+   */
+  private void doPuts() throws InterruptedException {
+    Cache cache = this.managementTestRule.getCache();
+    Region<String, String> region = cache.getRegion(Region.SEPARATOR + REGION_NAME);
+
+    Thread thread1 = new Thread(() -> {
+      for (int i = 0; i < NUMBER_PUTS; i++) {
+        region.put("T1_KEY_" + i, "VALUE_" + i);
       }
     });
-    Thread t2 = new Thread(new Runnable() {
-      public void run() {
-        for (int i = 0; i < 500; i++) {
-          r.put("T2_KEY_" + i, "VALUE_" + i);
-        }
+    Thread thread2 = new Thread(() -> {
+      for (int i = 0; i < NUMBER_PUTS; i++) {
+        region.put("T2_KEY_" + i, "VALUE_" + i);
       }
     });
-    Thread t3 = new Thread(new Runnable() {
-      public void run() {
-        for (int i = 0; i < 500; i++) {
-          r.put("T3_KEY_" + i, "VALUE_" + i);
-        }
+    Thread thread3 = new Thread(() -> {
+      for (int i = 0; i < NUMBER_PUTS; i++) {
+        region.put("T3_KEY_" + i, "VALUE_" + i);
       }
     });
 
-    t1.start();
-    t2.start();
-    t3.start();
+    thread1.start();
+    thread2.start();
+    thread3.start();
 
-    t1.join();
-    t2.join();
-    t3.join();
+    thread1.join();
+    thread2.join();
+    thread3.join();
   }
 
-  private static void resumePuts() {
-    Cache cache = GemFireCacheImpl.getInstance();
-    Region<String, String> r = cache.getRegion(Region.SEPARATOR + REGION_NAME);
-    for (int i = 0; i < 100; i++) {
-      r.put("NEWKEY_" + i, "NEWVALUE_" + i);
+  /**
+   * Invoked in serverVM
+   */
+  private void resumePuts() {
+    Cache cache = this.managementTestRule.getCache();
+    Region<String, String> region = cache.getRegion(Region.SEPARATOR + REGION_NAME);
+
+    for (int i = 0; i < NUMBER_PUTS; i++) {
+      region.put("NEWKEY_" + i, "NEWVALUE_" + i);
     }
-    r.put("last_key", "last_value");
+    region.put("last_key", "last_value");
   }
 
-  private static void waitForLastKey() {
-    WaitCriterion wc = new WaitCriterion() {
-      @Override
-      public boolean done() {
-        return lastKeyReceived;
-      }
+  /**
+   * Invoked in managerVM
+   */
+  private void verifyClientStats(final DistributedMember serverMember, final int serverPort,
+      final int numSubscriptions) throws Exception {
+    ManagementService service = this.managementTestRule.getManagementService();
+    CacheServerMXBean cacheServerMXBean = awaitCacheServerMXBean(serverMember, serverPort);
 
-      @Override
-      public String description() {
-        return "Did not receive last key.";
-      }
-    };
-    Wait.waitForCriterion(wc, 60 * 1000, 500, true);
-  }
+    String[] clientIds = cacheServerMXBean.getClientIds();
+    assertThat(clientIds).hasSize(2);
 
-  private static DistributedMember getMember() throws Exception {
-    GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
-    return cache.getDistributedSystem().getDistributedMember();
-  }
+    ClientHealthStatus[] clientStatuses = cacheServerMXBean.showAllClientStats();
 
-  private static void verifyClientStats(DistributedMember serverMember, int serverPort,
-      int numSubscriptions) {
-    GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
-    try {
-      ManagementService service = ManagementService.getExistingManagementService(cache);
-      CacheServerMXBean bean = MBeanUtil.getCacheServerMbeanProxy(serverMember, serverPort);
-
-      String[] clientIds = bean.getClientIds();
-      assertTrue(clientIds.length == 2);
-      System.out.println(
-          "<ExpectedString> ClientId-1 of the Server is  " + clientIds[0] + "</ExpectedString> ");
-      System.out.println(
-          "<ExpectedString> ClientId-2 of the Server is  " + clientIds[1] + "</ExpectedString> ");
-
-      ClientHealthStatus[] clientStatuses = bean.showAllClientStats();
-
-      ClientHealthStatus clientStatus1 = bean.showClientStats(clientIds[0]);
-      ClientHealthStatus clientStatus2 = bean.showClientStats(clientIds[1]);
-      assertNotNull(clientStatus1);
-      assertNotNull(clientStatus2);
-      System.out.println("<ExpectedString> ClientStats-1 of the Server is  " + clientStatus1
-          + "</ExpectedString> ");
-      System.out.println("<ExpectedString> ClientStats-2 of the Server is  " + clientStatus2
-          + "</ExpectedString> ");
-
-      System.out
-          .println("<ExpectedString> clientStatuses " + clientStatuses + "</ExpectedString> ");
-      assertNotNull(clientStatuses);
-
-      assertTrue(clientStatuses.length == 2);
-      for (ClientHealthStatus status : clientStatuses) {
-        System.out.println(
-            "<ExpectedString> ClientStats of the Server is  " + status + "</ExpectedString> ");
-      }
+    ClientHealthStatus clientStatus1 = cacheServerMXBean.showClientStats(clientIds[0]);
+    ClientHealthStatus clientStatus2 = cacheServerMXBean.showClientStats(clientIds[1]);
+    assertThat(clientStatus1).isNotNull();
+    assertThat(clientStatus2).isNotNull();
 
-      DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean();
-      assertEquals(2, dsBean.getNumClients());
-      assertEquals(numSubscriptions, dsBean.getNumSubscriptions());
+    assertThat(clientStatuses).isNotNull().hasSize(2);
 
-    } catch (Exception e) {
-      fail("Error while verifying cache server from remote member", e);
-    }
+    DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean();
+    assertThat(dsBean.getNumClients()).isEqualTo(2);
+    assertThat(dsBean.getNumSubscriptions()).isEqualTo(numSubscriptions);
   }
 
-  private static void put() {
-    Cache cache = GemFireCacheImpl.getInstance();
-    Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
-    assertNotNull(r1);
-
-    r1.put(k1, client_k1);
-    assertEquals(r1.getEntry(k1).getValue(), client_k1);
-    r1.put(k2, client_k2);
-    assertEquals(r1.getEntry(k2).getValue(), client_k2);
-    try {
-      Thread.sleep(10000);
-    } catch (Exception e) {
-      // sleep
-    }
-    r1.clear();
-
-    r1.put(k1, client_k1);
-    assertEquals(r1.getEntry(k1).getValue(), client_k1);
-    r1.put(k2, client_k2);
-    assertEquals(r1.getEntry(k2).getValue(), client_k2);
-    r1.clear();
-    try {
-      Thread.sleep(10000);
-    } catch (Exception e) {
-      // sleep
-    }
+  /**
+   * Invoked in client1VM and client2VM
+   */
+  private void put() {
+    Cache cache = (Cache) clientCache;
+    Region<String, String> region = cache.getRegion(Region.SEPARATOR + REGION_NAME);
+
+    region.put(KEY1, VALUE1);
+    assertThat(region.getEntry(KEY1).getValue()).isEqualTo(VALUE1);
+
+    region.put(KEY2, VALUE2);
+    assertThat(region.getEntry(KEY2).getValue()).isEqualTo(VALUE2);
+
+    region.clear();
+
+    region.put(KEY1, VALUE1);
+    assertThat(region.getEntry(KEY1).getValue()).isEqualTo(VALUE1);
+
+    region.put(KEY2, VALUE2);
+    assertThat(region.getEntry(KEY2).getValue()).isEqualTo(VALUE2);
+
+    region.clear();
   }
 
-  private static void verifyStats(int serverPort) throws Exception {
-    Cache cache = GemFireCacheImpl.getInstance();
-    ManagementService service = ManagementService.getExistingManagementService(cache);
+  /**
+   * Invoked in serverVM
+   */
+  private void verifyStats(final int serverPort) throws Exception {
+    ManagementService service = this.managementTestRule.getManagementService();
     CacheServerMXBean serverBean = service.getLocalCacheServerMXBean(serverPort);
-    CacheClientNotifier ccn = CacheClientNotifier.getInstance();
-    CacheClientProxy ccp = ccn.getClientProxies().iterator().next();
-    cache.getLoggerI18n().info(LocalizedStrings.DEBUG, "getQueueSize() " + ccp.getQueueSize());
-    cache.getLoggerI18n().info(LocalizedStrings.DEBUG,
-        "getQueueSizeStat() " + ccp.getQueueSizeStat());
-    cache.getLoggerI18n().info(LocalizedStrings.DEBUG,
-        "getEventsEnqued() " + ccp.getHARegionQueue().getStatistics().getEventsEnqued());
-    cache.getLoggerI18n().info(LocalizedStrings.DEBUG,
-        "getEventsDispatched() " + ccp.getHARegionQueue().getStatistics().getEventsDispatched());
-    cache.getLoggerI18n().info(LocalizedStrings.DEBUG,
-        "getEventsRemoved() " + ccp.getHARegionQueue().getStatistics().getEventsRemoved());
-    cache.getLoggerI18n().info(LocalizedStrings.DEBUG,
-        "getNumVoidRemovals() " + ccp.getHARegionQueue().getStatistics().getNumVoidRemovals());
-    assertEquals(ccp.getQueueSize(), ccp.getQueueSizeStat());
+
+    CacheClientNotifier clientNotifier = CacheClientNotifier.getInstance();
+    CacheClientProxy clientProxy = clientNotifier.getClientProxies().iterator().next();
+    assertThat(clientProxy.getQueueSizeStat()).isEqualTo(clientProxy.getQueueSize());
+
     ClientQueueDetail queueDetails = serverBean.showClientQueueDetails()[0];
-    assertEquals(queueDetails.getQueueSize(), ccp.getQueueSizeStat());
+    assertThat(clientProxy.getQueueSizeStat()).isEqualTo((int) queueDetails.getQueueSize());
+  }
+
+  private CacheServerMXBean awaitCacheServerMXBean(final DistributedMember serverMember,
+      final int port) {
+    SystemManagementService service = this.managementTestRule.getSystemManagementService();
+    ObjectName objectName = service.getCacheServerMBeanName(port, serverMember);
+
+    await().until(
+        () -> assertThat(service.getMBeanProxy(objectName, CacheServerMXBean.class)).isNotNull());
+
+    return service.getMBeanProxy(objectName, CacheServerMXBean.class);
+  }
+
+  private ConditionFactory await() {
+    return Awaitility.await().atMost(2, MINUTES);
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/8f48a585/geode-core/src/test/java/org/apache/geode/management/CompositeTypeTestDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/management/CompositeTypeTestDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/CompositeTypeTestDUnitTest.java
index 9c33003..33bbc08 100644
--- a/geode-core/src/test/java/org/apache/geode/management/CompositeTypeTestDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/CompositeTypeTestDUnitTest.java
@@ -14,163 +14,89 @@
  */
 package org.apache.geode.management;
 
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
+import static java.util.concurrent.TimeUnit.*;
+import static org.assertj.core.api.Assertions.*;
 
-import static org.junit.Assert.*;
+import java.io.Serializable;
 
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
-import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.categories.FlakyTest;
-
-import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionFactory;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
 import org.apache.geode.management.internal.MBeanJMXAdapter;
-import org.apache.geode.management.internal.ManagementConstants;
 import org.apache.geode.management.internal.SystemManagementService;
-import org.apache.geode.test.dunit.SerializableRunnable;
 import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
+import org.apache.geode.test.junit.categories.DistributedTest;
 
 @Category(DistributedTest.class)
-public class CompositeTypeTestDUnitTest extends ManagementTestBase {
+@SuppressWarnings({"serial", "unused"})
+public class CompositeTypeTestDUnitTest implements Serializable {
 
-  public CompositeTypeTestDUnitTest() {
-    super();
-    // TODO Auto-generated constructor stub
-  }
+  @Manager
+  private VM managerVM;
 
-  /**
-   * 
-   */
-  private static final long serialVersionUID = 1L;
+  @Member
+  private VM memberVM;
 
-  private static ObjectName objectName;
+  @Rule
+  public ManagementTestRule managementTestRule = ManagementTestRule.builder().start(true).build();
 
-  @Category(FlakyTest.class) // GEODE-1492
   @Test
   public void testCompositeTypeGetters() throws Exception {
+    registerMBeanWithCompositeTypeGetters(this.memberVM);
 
-    initManagement(false);
-    String member = getMemberId(managedNode1);
-    member = MBeanJMXAdapter.makeCompliantName(member);
-
-    registerMBeanWithCompositeTypeGetters(managedNode1, member);
+    String memberName = MBeanJMXAdapter.makeCompliantName(getMemberId(this.memberVM));
+    verifyMBeanWithCompositeTypeGetters(this.managerVM, memberName);
+  }
 
+  private void registerMBeanWithCompositeTypeGetters(final VM memberVM) {
+    memberVM.invoke("registerMBeanWithCompositeTypeGetters", () -> {
+      SystemManagementService service = this.managementTestRule.getSystemManagementService();
 
-    checkMBeanWithCompositeTypeGetters(managingNode, member);
+      ObjectName objectName = new ObjectName("GemFire:service=custom,type=composite");
+      CompositeTestMXBean compositeTestMXBean = new CompositeTestMBean();
 
+      objectName = service.registerMBean(compositeTestMXBean, objectName);
+      service.federate(objectName, CompositeTestMXBean.class, false);
+    });
   }
 
+  private void verifyMBeanWithCompositeTypeGetters(final VM managerVM, final String memberId) {
+    managerVM.invoke("verifyMBeanWithCompositeTypeGetters", () -> {
+      SystemManagementService service = this.managementTestRule.getSystemManagementService();
+      ObjectName objectName =
+          new ObjectName("GemFire:service=custom,type=composite,member=" + memberId);
 
-  /**
-   * Creates a Local region
-   *
-   * @param vm reference to VM
-   */
-  protected void registerMBeanWithCompositeTypeGetters(VM vm, final String memberID)
-      throws Exception {
-    SerializableRunnable regMBean =
-        new SerializableRunnable("Register CustomMBean with composite Type") {
-          public void run() {
-            GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
-            SystemManagementService service = (SystemManagementService) getManagementService();
-
-            try {
-              ObjectName objectName = new ObjectName("GemFire:service=custom,type=composite");
-              CompositeTestMXBean mbean = new CompositeTestMBean();
-              objectName = service.registerMBean(mbean, objectName);
-              service.federate(objectName, CompositeTestMXBean.class, false);
-            } catch (MalformedObjectNameException e) {
-              // TODO Auto-generated catch block
-              e.printStackTrace();
-            } catch (NullPointerException e) {
-              // TODO Auto-generated catch block
-              e.printStackTrace();
-            }
-
-
-
-          }
-        };
-    vm.invoke(regMBean);
-  }
+      await().until(() -> service.getMBeanInstance(objectName, CompositeTestMXBean.class) != null);
+
+      CompositeTestMXBean compositeTestMXBean =
+          service.getMBeanInstance(objectName, CompositeTestMXBean.class);
+      assertThat(compositeTestMXBean).isNotNull();
 
+      CompositeStats listCompositeStatsData = compositeTestMXBean.listCompositeStats();
+      assertThat(listCompositeStatsData).isNotNull();
 
-  /**
-   * Creates a Local region
-   *
-   * @param vm reference to VM
-   */
-  protected void checkMBeanWithCompositeTypeGetters(VM vm, final String memberID) throws Exception {
-    SerializableRunnable checkMBean =
-        new SerializableRunnable("Check CustomMBean with composite Type") {
-          public void run() {
-            GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
-            final SystemManagementService service =
-                (SystemManagementService) getManagementService();
-
-            try {
-              final ObjectName objectName =
-                  new ObjectName("GemFire:service=custom,type=composite,member=" + memberID);
-
-              Wait.waitForCriterion(new WaitCriterion() {
-                public String description() {
-                  return "Waiting for Composite Type MBean";
-                }
-
-                public boolean done() {
-                  CompositeTestMXBean bean =
-                      service.getMBeanInstance(objectName, CompositeTestMXBean.class);
-                  boolean done = (bean != null);
-                  return done;
-                }
-
-              }, ManagementConstants.REFRESH_TIME * 4, 500, true);
-
-
-              CompositeTestMXBean bean =
-                  service.getMBeanInstance(objectName, CompositeTestMXBean.class);
-
-              CompositeStats listData = bean.listCompositeStats();
-
-              System.out.println("connectionStatsType = " + listData.getConnectionStatsType());
-              System.out.println("connectionsOpened = " + listData.getConnectionsOpened());
-              System.out.println("connectionsClosed = " + listData.getConnectionsClosed());
-              System.out.println("connectionsAttempted = " + listData.getConnectionsAttempted());
-              System.out.println("connectionsFailed = " + listData.getConnectionsFailed());
-
-              CompositeStats getsData = bean.getCompositeStats();
-              System.out.println("connectionStatsType = " + getsData.getConnectionStatsType());
-              System.out.println("connectionsOpened = " + getsData.getConnectionsOpened());
-              System.out.println("connectionsClosed = " + getsData.getConnectionsClosed());
-              System.out.println("connectionsAttempted = " + getsData.getConnectionsAttempted());
-              System.out.println("connectionsFailed = " + getsData.getConnectionsFailed());
-
-              CompositeStats[] arrayData = bean.getCompositeArray();
-              Integer[] intArrayData = bean.getIntegerArray();
-              Thread.sleep(2 * 60 * 1000);
-            } catch (MalformedObjectNameException e) {
-              // TODO Auto-generated catch block
-              e.printStackTrace();
-            } catch (NullPointerException e) {
-              // TODO Auto-generated catch block
-              e.printStackTrace();
-            } catch (InterruptedException e) {
-              // TODO Auto-generated catch block
-              e.printStackTrace();
-            }
-
-
-
-          }
-        };
-    vm.invoke(checkMBean);
+      CompositeStats getCompositeStatsData = compositeTestMXBean.getCompositeStats();
+      assertThat(getCompositeStatsData).isNotNull();
+
+      CompositeStats[] getCompositeArrayData = compositeTestMXBean.getCompositeArray();
+      assertThat(getCompositeArrayData).isNotNull().isNotEmpty();
+
+      Integer[] getIntegerArrayData = compositeTestMXBean.getIntegerArray();
+      assertThat(getIntegerArrayData).isNotNull().isNotEmpty();
+    });
   }
 
+  private String getMemberId(final VM memberVM) {
+    return memberVM.invoke("getMemberId",
+        () -> this.managementTestRule.getDistributedMember().getId());
+  }
 
+  private ConditionFactory await() {
+    return Awaitility.await().atMost(2, MINUTES);
+  }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/8f48a585/geode-core/src/test/java/org/apache/geode/management/DLockManagementDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/management/DLockManagementDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/DLockManagementDUnitTest.java
index 86501c3..1193f6b 100644
--- a/geode-core/src/test/java/org/apache/geode/management/DLockManagementDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/DLockManagementDUnitTest.java
@@ -14,452 +14,291 @@
  */
 package org.apache.geode.management;
 
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
-import org.apache.geode.test.junit.categories.DistributedTest;
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.geode.internal.process.ProcessUtils.*;
+import static org.apache.geode.management.internal.MBeanJMXAdapter.*;
+import static org.assertj.core.api.Assertions.*;
 
+import java.io.Serializable;
 import java.util.Map;
 import java.util.Set;
 
 import javax.management.ObjectName;
 
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionFactory;
+import org.junit.Rule;
+import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.distributed.DistributedLockService;
 import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.locks.DLockService;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.management.internal.MBeanJMXAdapter;
 import org.apache.geode.management.internal.SystemManagementService;
-import org.apache.geode.test.dunit.Assert;
-import org.apache.geode.test.dunit.LogWriterUtils;
-import org.apache.geode.test.dunit.SerializableRunnable;
 import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
-import org.apache.geode.test.junit.categories.FlakyTest;
+import org.apache.geode.test.junit.categories.DistributedTest;
 
 @Category(DistributedTest.class)
-public class DLockManagementDUnitTest extends ManagementTestBase {
+@SuppressWarnings({"serial", "unused"})
+public class DLockManagementDUnitTest implements Serializable {
 
-  private static final long serialVersionUID = 1L;
+  private static final int MAX_WAIT_MILLIS = 120 * 1000; // 2 MINUTES
 
-  private static final String LOCK_SERVICE_NAME = "testLockService";
+  private static final String LOCK_SERVICE_NAME =
+      DLockManagementDUnitTest.class.getSimpleName() + "_testLockService";
 
-  // This must be bigger than the dunit ack-wait-threshold for the revoke
-  // tests. The command line is setting the ack-wait-threshold to be
-  // 60 seconds.
-  private static final int MAX_WAIT = 70 * 1000;
+  @Member
+  private VM[] memberVMs;
 
-  public DLockManagementDUnitTest() {
-    super();
+  @Manager
+  private VM managerVM;
 
-  }
+  @Rule
+  public ManagementTestRule managementTestRule =
+      ManagementTestRule.builder().defineManagersFirst(false).start(true).build();
 
-  /**
-   * Distributed Lock Service test
-   * 
-   * @throws Exception
-   */
-  @Category(FlakyTest.class) // GEODE-173: eats exceptions, HeadlessGFSH, time sensitive,
-                             // waitForCriterions
   @Test
-  public void testDLockMBean() throws Throwable {
-
-    initManagement(false);
+  public void testLockServiceMXBean() throws Throwable {
+    createLockServiceGrantor(this.memberVMs[0]);
+    createLockService(this.memberVMs[1]);
+    createLockService(this.memberVMs[2]);
 
-    VM[] managedNodes = new VM[getManagedNodeList().size()];
-    VM managingNode = getManagingNode();
-
-    getManagedNodeList().toArray(managedNodes);
-
-    createGrantorLockService(managedNodes[0]);
-
-    createLockService(managedNodes[1]);
-
-    createLockService(managedNodes[2]);
-
-    for (VM vm : getManagedNodeList()) {
-      verifyLockData(vm);
+    for (VM memberVM : this.memberVMs) {
+      verifyLockServiceMXBeanInMember(memberVM);
     }
-    verifyLockDataRemote(managingNode);
+    verifyLockServiceMXBeanInManager(this.managerVM);
 
-    for (VM vm : getManagedNodeList()) {
-      closeLockService(vm);
+    for (VM memberVM : this.memberVMs) {
+      closeLockService(memberVM);
     }
   }
 
-  /**
-   * Distributed Lock Service test
-   * 
-   * @throws Exception
-   */
-  @Category(FlakyTest.class) // GEODE-553: waitForCriterion, eats exceptions, HeadlessGFSH
   @Test
-  public void testDLockAggregate() throws Throwable {
-    initManagement(false);
-    VM[] managedNodes = new VM[getManagedNodeList().size()];
-    VM managingNode = getManagingNode();
-
-    getManagedNodeList().toArray(managedNodes);
-
-    createGrantorLockService(managedNodes[0]);
-
-    createLockService(managedNodes[1]);
+  public void testDistributedLockServiceMXBean() throws Throwable {
+    createLockServiceGrantor(this.memberVMs[0]);
+    createLockService(this.memberVMs[1]);
+    createLockService(this.memberVMs[2]);
 
-    createLockService(managedNodes[2]);
+    verifyDistributedLockServiceMXBean(this.managerVM, 3);
 
-    checkAggregate(managingNode, 3);
-    DistributedMember member = getMember(managedNodes[2]);
-    checkNavigation(managingNode, member);
+    DistributedMember member = this.managementTestRule.getDistributedMember(this.memberVMs[2]);
+    verifyFetchOperations(this.managerVM, member);
 
-    createLockService(managingNode);
-    checkAggregate(managingNode, 4);
+    createLockService(this.managerVM);
+    verifyDistributedLockServiceMXBean(this.managerVM, 4);
 
-
-    for (VM vm : getManagedNodeList()) {
-      closeLockService(vm);
+    for (VM memberVM : this.memberVMs) {
+      closeLockService(memberVM);
     }
-    ensureProxyCleanup(managingNode);
-    checkAggregate(managingNode, 1);
-    closeLockService(managingNode);
-    checkAggregate(managingNode, 0);
+    verifyProxyCleanupInManager(this.managerVM);
+    verifyDistributedLockServiceMXBean(this.managerVM, 1);
 
+    closeLockService(this.managerVM);
+    verifyDistributedLockServiceMXBean(this.managerVM, 0);
   }
 
-  public void ensureProxyCleanup(final VM vm) {
-
-    SerializableRunnable ensureProxyCleanup = new SerializableRunnable("Ensure Proxy cleanup") {
-      public void run() {
-        GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
-        Set<DistributedMember> otherMemberSet =
-            cache.getDistributionManager().getOtherNormalDistributionManagerIds();
-        final SystemManagementService service = (SystemManagementService) getManagementService();
-
-
-        for (final DistributedMember member : otherMemberSet) {
-          RegionMXBean bean = null;
-          try {
-
-            Wait.waitForCriterion(new WaitCriterion() {
-
-              LockServiceMXBean bean = null;
-
-              public String description() {
-                return "Waiting for the proxy to get deleted at managing node";
-              }
-
-              public boolean done() {
-                ObjectName objectName = service.getRegionMBeanName(member, LOCK_SERVICE_NAME);
-                bean = service.getMBeanProxy(objectName, LockServiceMXBean.class);
-                boolean done = (bean == null);
-                return done;
-              }
-
-            }, MAX_WAIT, 500, true);
-
-          } catch (Exception e) {
-            throw new AssertionError("could not remove proxies in required time", e);
-
-          }
-          assertNull(bean);
-
-        }
+  private void verifyProxyCleanupInManager(final VM managerVM) {
+    managerVM.invoke("verifyProxyCleanupInManager", () -> {
+      Set<DistributedMember> otherMembers = this.managementTestRule.getOtherNormalMembers();
+      SystemManagementService service = this.managementTestRule.getSystemManagementService();
 
+      for (final DistributedMember member : otherMembers) {
+        ObjectName objectName = service.getRegionMBeanName(member, LOCK_SERVICE_NAME);
+        await().until(() -> assertThat(lockServiceMXBeanIsGone(service, objectName)).isTrue());
       }
-    };
-    vm.invoke(ensureProxyCleanup);
+    });
   }
 
-  /**
-   * Creates a grantor lock service
-   * 
-   * @param vm
-   */
-  @SuppressWarnings("serial")
-  protected void createGrantorLockService(final VM vm) {
-    SerializableRunnable createGrantorLockService =
-        new SerializableRunnable("Create Grantor LockService") {
-          public void run() {
-            GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
-            assertNull(DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME));
+  private boolean lockServiceMXBeanIsGone(final SystemManagementService service,
+      final ObjectName objectName) {
+    return service.getMBeanProxy(objectName, LockServiceMXBean.class) == null;
+  }
 
-            DLockService service = (DLockService) DistributedLockService.create(LOCK_SERVICE_NAME,
-                cache.getDistributedSystem());
+  private void createLockServiceGrantor(final VM memberVM) {
+    memberVM.invoke("createLockServiceGrantor", () -> {
+      assertThat(DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME)).isNull();
 
-            assertSame(service, DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME));
+      DLockService lockService = (DLockService) DistributedLockService.create(LOCK_SERVICE_NAME,
+          this.managementTestRule.getCache().getDistributedSystem());
+      DistributedMember grantor = lockService.getLockGrantorId().getLockGrantorMember();
+      assertThat(grantor).isNotNull();
 
-            InternalDistributedMember grantor = service.getLockGrantorId().getLockGrantorMember();
+      LockServiceMXBean lockServiceMXBean = awaitLockServiceMXBean(LOCK_SERVICE_NAME);
 
-            assertNotNull(grantor);
+      assertThat(lockServiceMXBean).isNotNull();
+      assertThat(lockServiceMXBean.isDistributed()).isTrue();
+      assertThat(lockServiceMXBean.getName()).isEqualTo(LOCK_SERVICE_NAME);
+      assertThat(lockServiceMXBean.isLockGrantor()).isTrue();
+      assertThat(lockServiceMXBean.fetchGrantorMember())
+          .isEqualTo(this.managementTestRule.getDistributedMember().getId());
+    });
+  }
 
-            LogWriterUtils.getLogWriter().info("In identifyLockGrantor - grantor is " + grantor);
+  private void createLockService(final VM anyVM) {
+    anyVM.invoke("createLockService", () -> {
+      assertThat(DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME)).isNull();
 
+      DistributedLockService.create(LOCK_SERVICE_NAME,
+          this.managementTestRule.getCache().getDistributedSystem());
 
+      LockServiceMXBean lockServiceMXBean = awaitLockServiceMXBean(LOCK_SERVICE_NAME);
 
-            ManagementService mgmtService = getManagementService();
+      assertThat(lockServiceMXBean).isNotNull();
+      assertThat(lockServiceMXBean.isDistributed()).isTrue();
+      assertThat(lockServiceMXBean.isLockGrantor()).isFalse();
+    });
+  }
 
-            LockServiceMXBean bean = mgmtService.getLocalLockServiceMBean(LOCK_SERVICE_NAME);
+  private void closeLockService(final VM anyVM) {
+    anyVM.invoke("closeLockService", () -> {
+      assertThat(DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME)).isNotNull();
+      DistributedLockService.destroy(LOCK_SERVICE_NAME);
 
-            assertNotNull(bean);
+      awaitLockServiceMXBeanIsNull(LOCK_SERVICE_NAME);
 
-            assertTrue(bean.isDistributed());
+      ManagementService service = this.managementTestRule.getManagementService();
+      LockServiceMXBean lockServiceMXBean = service.getLocalLockServiceMBean(LOCK_SERVICE_NAME);
+      assertThat(lockServiceMXBean).isNull();
+    });
+  }
 
-            assertEquals(bean.getName(), LOCK_SERVICE_NAME);
+  private void verifyLockServiceMXBeanInMember(final VM memberVM) {
+    memberVM.invoke("verifyLockServiceMXBeanInManager", () -> {
+      DistributedLockService lockService =
+          DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME);
+      lockService.lock("lockObject_" + identifyPid(), MAX_WAIT_MILLIS, -1);
 
-            assertTrue(bean.isLockGrantor());
+      ManagementService service = this.managementTestRule.getManagementService();
+      LockServiceMXBean lockServiceMXBean = service.getLocalLockServiceMBean(LOCK_SERVICE_NAME);
+      assertThat(lockServiceMXBean).isNotNull();
 
-            assertEquals(cache.getDistributedSystem().getMemberId(), bean.fetchGrantorMember());
+      String[] listHeldLock = lockServiceMXBean.listHeldLocks();
+      assertThat(listHeldLock).hasSize(1);
 
-          }
-        };
-    vm.invoke(createGrantorLockService);
+      Map<String, String> lockThreadMap = lockServiceMXBean.listThreadsHoldingLock();
+      assertThat(lockThreadMap).hasSize(1);
+    });
   }
 
   /**
-   * Creates a named lock service
-   * 
-   * @param vm
+   * Verify lock data from remote Managing node
    */
-  @SuppressWarnings("serial")
-  protected void createLockService(final VM vm) {
-    SerializableRunnable createLockService = new SerializableRunnable("Create LockService") {
-      public void run() {
-        assertNull(DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME));
-        GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
-        DistributedLockService service =
-            DistributedLockService.create(LOCK_SERVICE_NAME, cache.getDistributedSystem());
-
-        assertSame(service, DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME));
-
+  private void verifyLockServiceMXBeanInManager(final VM managerVM) throws Exception {
+    managerVM.invoke("verifyLockServiceMXBeanInManager", () -> {
+      Set<DistributedMember> otherMembers = this.managementTestRule.getOtherNormalMembers();
 
+      for (DistributedMember member : otherMembers) {
+        LockServiceMXBean lockServiceMXBean =
+            awaitLockServiceMXBeanProxy(member, LOCK_SERVICE_NAME);
+        assertThat(lockServiceMXBean).isNotNull();
 
-        ManagementService mgmtService = getManagementService();
+        String[] listHeldLock = lockServiceMXBean.listHeldLocks();
+        assertThat(listHeldLock).hasSize(1);
 
-        LockServiceMXBean bean = mgmtService.getLocalLockServiceMBean(LOCK_SERVICE_NAME);
-
-        assertNotNull(bean);
-
-        assertTrue(bean.isDistributed());
-
-        assertFalse(bean.isLockGrantor());
+        Map<String, String> lockThreadMap = lockServiceMXBean.listThreadsHoldingLock();
+        assertThat(lockThreadMap).hasSize(1);
       }
-    };
-    vm.invoke(createLockService);
+    });
+  }
+
+  private void verifyFetchOperations(final VM memberVM, final DistributedMember member) {
+    memberVM.invoke("verifyFetchOperations", () -> {
+      ManagementService service = this.managementTestRule.getManagementService();
+
+      DistributedSystemMXBean distributedSystemMXBean = awaitDistributedSystemMXBean();
+      ObjectName distributedLockServiceMXBeanName =
+          getDistributedLockServiceName(LOCK_SERVICE_NAME);
+      assertThat(distributedSystemMXBean.fetchDistributedLockServiceObjectName(LOCK_SERVICE_NAME))
+          .isEqualTo(distributedLockServiceMXBeanName);
+
+      ObjectName lockServiceMXBeanName = getLockServiceMBeanName(member.getId(), LOCK_SERVICE_NAME);
+      assertThat(
+          distributedSystemMXBean.fetchLockServiceObjectName(member.getId(), LOCK_SERVICE_NAME))
+              .isEqualTo(lockServiceMXBeanName);
+    });
   }
 
   /**
-   * Closes a named lock service
-   * 
-   * @param vm
+   * Verify Aggregate MBean
    */
-  @SuppressWarnings("serial")
-  protected void closeLockService(final VM vm) {
-    SerializableRunnable closeLockService = new SerializableRunnable("Close LockService") {
-      public void run() {
-
-        DistributedLockService service = DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME);
-
-        DistributedLockService.destroy(LOCK_SERVICE_NAME);
-
-        ManagementService mgmtService = getManagementService();
-
-        LockServiceMXBean bean = null;
-        try {
+  private void verifyDistributedLockServiceMXBean(final VM managerVM, final int memberCount) {
+    managerVM.invoke("verifyDistributedLockServiceMXBean", () -> {
+      ManagementService service = this.managementTestRule.getManagementService();
+
+      if (memberCount == 0) {
+        await().until(
+            () -> assertThat(service.getDistributedLockServiceMXBean(LOCK_SERVICE_NAME)).isNull());
+        return;
+      }
 
-          bean = mgmtService.getLocalLockServiceMBean(LOCK_SERVICE_NAME);
+      DistributedLockServiceMXBean distributedLockServiceMXBean =
+          awaitDistributedLockServiceMXBean(LOCK_SERVICE_NAME, memberCount);
+      assertThat(distributedLockServiceMXBean).isNotNull();
+      assertThat(distributedLockServiceMXBean.getName()).isEqualTo(LOCK_SERVICE_NAME);
+    });
+  }
 
-        } catch (ManagementException mgs) {
+  private DistributedSystemMXBean awaitDistributedSystemMXBean() {
+    ManagementService service = this.managementTestRule.getManagementService();
 
-        }
-        assertNull(bean);
+    await().until(() -> assertThat(service.getDistributedSystemMXBean()).isNotNull());
 
-      }
-    };
-    vm.invoke(closeLockService);
+    return service.getDistributedSystemMXBean();
   }
 
   /**
-   * Lock data related verifications
-   * 
-   * @param vm
+   * Await and return a DistributedRegionMXBean proxy with specified member count.
    */
-  @SuppressWarnings("serial")
-  protected void verifyLockData(final VM vm) {
-    SerializableRunnable verifyLockData = new SerializableRunnable("Verify LockService") {
-      public void run() {
-
-        DistributedLockService service = DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME);
-
-        final String LOCK_OBJECT = "lockObject_" + vm.getPid();
-
-        Wait.waitForCriterion(new WaitCriterion() {
-          DistributedLockService service = null;
-
-          public String description() {
-            return "Waiting for the lock service to be initialised";
-          }
-
-          public boolean done() {
-            DistributedLockService service =
-                DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME);
-            boolean done = service != null;
-            return done;
-          }
-
-        }, MAX_WAIT, 500, true);
-
-        service.lock(LOCK_OBJECT, 1000, -1);
-
+  private DistributedLockServiceMXBean awaitDistributedLockServiceMXBean(
+      final String lockServiceName, final int memberCount) {
+    ManagementService service = this.managementTestRule.getManagementService();
 
-        ManagementService mgmtService = getManagementService();
+    await().until(() -> {
+      assertThat(service.getDistributedLockServiceMXBean(lockServiceName)).isNotNull();
+      assertThat(service.getDistributedLockServiceMXBean(lockServiceName).getMemberCount())
+          .isEqualTo(memberCount);
+    });
 
-        LockServiceMXBean bean = null;
-        try {
-
-          bean = mgmtService.getLocalLockServiceMBean(LOCK_SERVICE_NAME);
-
-        } catch (ManagementException mgs) {
-
-        }
-        assertNotNull(bean);
-        String[] listHeldLock = bean.listHeldLocks();
-        assertEquals(listHeldLock.length, 1);
-        LogWriterUtils.getLogWriter().info("List Of Lock Object is  " + listHeldLock[0]);
-        Map<String, String> lockThreadMap = bean.listThreadsHoldingLock();
-        assertEquals(lockThreadMap.size(), 1);
-        LogWriterUtils.getLogWriter().info("List Of Lock Thread is  " + lockThreadMap.toString());
-      }
-    };
-    vm.invoke(verifyLockData);
+    return service.getDistributedLockServiceMXBean(lockServiceName);
   }
 
   /**
-   * Verify lock data from remote Managing node
-   * 
-   * @param vm
+   * Await and return a LockServiceMXBean proxy for a specific member and lockServiceName.
    */
-  @SuppressWarnings("serial")
-  protected void verifyLockDataRemote(final VM vm) {
-    SerializableRunnable verifyLockDataRemote =
-        new SerializableRunnable("Verify LockService Remote") {
-          public void run() {
-
-            GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
-            Set<DistributedMember> otherMemberSet =
-                cache.getDistributionManager().getOtherNormalDistributionManagerIds();
-
-            for (DistributedMember member : otherMemberSet) {
-              LockServiceMXBean bean = null;
-              try {
-                bean = MBeanUtil.getLockServiceMbeanProxy(member, LOCK_SERVICE_NAME);
-              } catch (Exception e) {
-                InternalDistributedSystem.getLoggerI18n()
-                    .fine("Undesired Result , LockServiceMBean Should not be null", e);
-
-              }
-              assertNotNull(bean);
-              String[] listHeldLock = bean.listHeldLocks();
-              assertEquals(listHeldLock.length, 1);
-              LogWriterUtils.getLogWriter().info("List Of Lock Object is  " + listHeldLock[0]);
-              Map<String, String> lockThreadMap = bean.listThreadsHoldingLock();
-              assertEquals(lockThreadMap.size(), 1);
-              LogWriterUtils.getLogWriter()
-                  .info("List Of Lock Thread is  " + lockThreadMap.toString());
-            }
-
-          }
-        };
-    vm.invoke(verifyLockDataRemote);
-  }
+  private LockServiceMXBean awaitLockServiceMXBeanProxy(final DistributedMember member,
+      final String lockServiceName) {
+    SystemManagementService service = this.managementTestRule.getSystemManagementService();
+    ObjectName lockServiceMXBeanName = service.getLockServiceMBeanName(member, lockServiceName);
 
-  protected void checkNavigation(final VM vm, final DistributedMember lockServiceMember) {
-    SerializableRunnable checkNavigation = new SerializableRunnable("Check Navigation") {
-      public void run() {
-
-        final ManagementService service = getManagementService();
-
-        DistributedSystemMXBean disMBean = service.getDistributedSystemMXBean();
-        try {
-          ObjectName expected = MBeanJMXAdapter.getDistributedLockServiceName(LOCK_SERVICE_NAME);
-          ObjectName actual = disMBean.fetchDistributedLockServiceObjectName(LOCK_SERVICE_NAME);
-          assertEquals(expected, actual);
-        } catch (Exception e) {
-          throw new AssertionError("Lock Service Navigation Failed ", e);
-        }
-
-        try {
-          ObjectName expected =
-              MBeanJMXAdapter.getLockServiceMBeanName(lockServiceMember.getId(), LOCK_SERVICE_NAME);
-          ObjectName actual =
-              disMBean.fetchLockServiceObjectName(lockServiceMember.getId(), LOCK_SERVICE_NAME);
-          assertEquals(expected, actual);
-        } catch (Exception e) {
-          throw new AssertionError("Lock Service Navigation Failed ", e);
-        }
+    await().until(
+        () -> assertThat(service.getMBeanProxy(lockServiceMXBeanName, LockServiceMXBean.class))
+            .isNotNull());
 
-      }
-    };
-    vm.invoke(checkNavigation);
+    return service.getMBeanProxy(lockServiceMXBeanName, LockServiceMXBean.class);
   }
 
   /**
-   * Verify Aggregate MBean
-   * 
-   * @param vm
+   * Await creation of local LockServiceMXBean for specified lockServiceName.
    */
-  @SuppressWarnings("serial")
-  protected void checkAggregate(final VM vm, final int expectedMembers) {
-    SerializableRunnable checkAggregate = new SerializableRunnable("Verify Aggregate MBean") {
-      public void run() {
-
-        final ManagementService service = getManagementService();
-        if (expectedMembers == 0) {
-          try {
-            Wait.waitForCriterion(new WaitCriterion() {
-
-              DistributedLockServiceMXBean bean = null;
+  private LockServiceMXBean awaitLockServiceMXBean(final String lockServiceName) {
+    SystemManagementService service = this.managementTestRule.getSystemManagementService();
 
-              public String description() {
-                return "Waiting for the proxy to get deleted at managing node";
-              }
+    await().until(() -> assertThat(service.getLocalLockServiceMBean(lockServiceName)).isNotNull());
 
-              public boolean done() {
-                bean = service.getDistributedLockServiceMXBean(LOCK_SERVICE_NAME);
-
-                boolean done = (bean == null);
-                return done;
-              }
-
-            }, MAX_WAIT, 500, true);
-
-          } catch (Exception e) {
-            throw new AssertionError("could not remove Aggregate Bean in required time", e);
-
-          }
-          return;
-        }
+    return service.getLocalLockServiceMBean(lockServiceName);
+  }
 
-        DistributedLockServiceMXBean bean = null;
-        try {
-          bean = MBeanUtil.getDistributedLockMbean(LOCK_SERVICE_NAME, expectedMembers);
-        } catch (Exception e) {
-          InternalDistributedSystem.getLoggerI18n()
-              .fine("Undesired Result , LockServiceMBean Should not be null", e);
+  /**
+   * Await destruction of local LockServiceMXBean for specified lockServiceName.
+   */
+  private void awaitLockServiceMXBeanIsNull(final String lockServiceName) {
+    SystemManagementService service = this.managementTestRule.getSystemManagementService();
 
-        }
-        assertNotNull(bean);
-        assertEquals(bean.getName(), LOCK_SERVICE_NAME);
+    await().until(() -> assertThat(service.getLocalLockServiceMBean(lockServiceName)).isNull());
+  }
 
-      }
-    };
-    vm.invoke(checkAggregate);
+  private ConditionFactory await() {
+    return Awaitility.await().atMost(MAX_WAIT_MILLIS, MILLISECONDS);
   }
 }
+


Mime
View raw message