geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [geode] 03/03: GEODE-3965: rename and cleanup DistributionManager tests
Date Fri, 05 Jan 2018 21:02:57 GMT
This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 1f4c90794af85b4db6adeee86555f486ecf3f6e6
Author: Kirk Lund <klund@apache.org>
AuthorDate: Thu Jan 4 15:40:57 2018 -0800

    GEODE-3965: rename and cleanup DistributionManager tests
    
    * DistributionManagerTest -> ClusterDistributionManagerTest
    * DistributionManagerDUnitTest -> ClusterDistributionManagerDUnitTest
    * ConsoleDistributionManagerDUnitTest -> ClusterDistributionManagerForAdminDUnitTest
---
 .../ClusterDistributionManagerDUnitTest.java       | 399 +++++++++++++++
 ...lusterDistributionManagerForAdminDUnitTest.java | 317 ++++++++++++
 ...st.java => ClusterDistributionManagerTest.java} |   4 +-
 .../ConsoleDistributionManagerDUnitTest.java       | 446 -----------------
 .../internal/DistributionManagerDUnitTest.java     | 545 ---------------------
 5 files changed, 719 insertions(+), 992 deletions(-)

diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/ClusterDistributionManagerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/ClusterDistributionManagerDUnitTest.java
new file mode 100644
index 0000000..0ba2381
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/ClusterDistributionManagerDUnitTest.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed.internal;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.distributed.ConfigurationProperties.ACK_SEVERE_ALERT_THRESHOLD;
+import static org.apache.geode.distributed.ConfigurationProperties.ACK_WAIT_THRESHOLD;
+import static org.apache.geode.distributed.ConfigurationProperties.BIND_ADDRESS;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.NAME;
+import static org.apache.geode.distributed.internal.DistributionConfig.GEMFIRE_PREFIX;
+import static org.apache.geode.test.dunit.Assert.assertEquals;
+import static org.apache.geode.test.dunit.Assert.assertTrue;
+import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
+import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM;
+import static org.apache.geode.test.dunit.NetworkUtils.getIPLiteral;
+import static org.apache.geode.test.dunit.Wait.pause;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.awaitility.Awaitility.await;
+
+import java.net.InetAddress;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.ForcedDisconnectException;
+import org.apache.geode.LogWriter;
+import org.apache.geode.admin.AdminDistributedSystem;
+import org.apache.geode.admin.AdminDistributedSystemFactory;
+import org.apache.geode.admin.AlertLevel;
+import org.apache.geode.admin.DistributedSystemConfig;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheListener;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.distributed.internal.membership.MembershipManager;
+import org.apache.geode.distributed.internal.membership.NetView;
+import org.apache.geode.distributed.internal.membership.gms.MembershipManagerHelper;
+import org.apache.geode.distributed.internal.membership.gms.mgr.GMSMembershipManager;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.test.dunit.DistributedTestCase;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.SerializableRunnable;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.dunit.rules.SharedErrorCollector;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.categories.MembershipTest;
+
+/**
+ * This class tests the functionality of the {@link ClusterDistributionManager} class.
+ */
+@Category({DistributedTest.class, MembershipTest.class})
+public class ClusterDistributionManagerDUnitTest extends DistributedTestCase {
+  private static final Logger logger = LogService.getLogger();
+
+  private static volatile boolean regionDestroyedInvoked;
+  private static volatile Cache myCache;
+  private static volatile boolean alertReceived;
+
+  private transient ExecutorService executorService;
+
+  private VM vm1;
+
+  @Rule
+  public DistributedRestoreSystemProperties restoreSystemProperties =
+      new DistributedRestoreSystemProperties();
+
+  @Rule
+  public SharedErrorCollector errorCollector = new SharedErrorCollector();
+
+  @Before
+  public void setUp() throws Exception {
+    executorService = Executors.newSingleThreadExecutor();
+    vm1 = Host.getHost(0).getVM(1);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    disconnectAllFromDS();
+    invokeInEveryVM(() -> {
+      regionDestroyedInvoked = false;
+      myCache = null;
+      alertReceived = false;
+    });
+    assertThat(executorService.shutdownNow()).isEmpty();
+  }
+
+  @Test
+  public void testGetDistributionVMType() {
+    DistributionManager dm = getSystem().getDistributionManager();
+    InternalDistributedMember member = dm.getId();
+
+    assertEquals(ClusterDistributionManager.NORMAL_DM_TYPE, member.getVmKind());
+  }
+
+  /**
+   * Demonstrate that a new UDP port is used when an attempt is made to reconnect using a shunned
+   * port
+   */
+  @Test
+  public void testConnectAfterBeingShunned() {
+    InternalDistributedSystem system = getSystem();
+    MembershipManager membershipManager = MembershipManagerHelper.getMembershipManager(system);
+    InternalDistributedMember memberBefore = membershipManager.getLocalMember();
+
+    // TODO GMS needs to have a system property allowing the bind-port to be set
+    System.setProperty(GEMFIRE_PREFIX + "jg-bind-port", "" + memberBefore.getPort());
+    system.disconnect();
+    system = getSystem();
+    membershipManager = MembershipManagerHelper.getMembershipManager(system);
+    system.disconnect();
+    InternalDistributedMember memberAfter = membershipManager.getLocalMember();
+
+    assertThat(memberAfter.getPort()).isEqualTo(memberBefore.getPort());
+  }
+
+  /**
+   * Test the handling of "surprise members" in the membership manager. Create a DistributedSystem
+   * in this VM and then add a fake member to its surpriseMember set. Then ensure that it stays in
+   * the set when a new membership view arrives that doesn't contain it. Then wait until the member
+   * should be gone and force more view processing to have it scrubbed from the set.
+   **/
+  @Test
+  public void testSurpriseMemberHandling() throws Exception {
+    System.setProperty(GEMFIRE_PREFIX + "surprise-member-timeout", "3000");
+    InternalDistributedSystem system = getSystem();
+    GMSMembershipManager membershipManager =
+        (GMSMembershipManager) MembershipManagerHelper.getMembershipManager(system);
+    assertThat(membershipManager.isCleanupTimerStarted()).isTrue();
+
+    InternalDistributedMember member = new InternalDistributedMember(getIPLiteral(), 12345);
+
+    // first make sure we can't add this as a surprise member (bug #44566)
+    // if the view number isn't being recorded correctly the test will pass but the
+    // functionality is broken
+    assertThat(membershipManager.getView().getViewId()).isGreaterThan(0);
+
+    int oldViewId = member.getVmViewId();
+    member.setVmViewId(membershipManager.getView().getViewId() - 1);
+
+    addIgnoredException("attempt to add old member");
+    addIgnoredException("Removing shunned GemFire node");
+
+    boolean accepted = membershipManager.addSurpriseMember(member);
+    assertThat(accepted).as("member with old ID was not rejected (bug #44566)").isFalse();
+
+    member.setVmViewId(oldViewId);
+
+    // now forcibly add it as a surprise member and show that it is reaped
+    long gracePeriod = 5000;
+    long startTime = System.currentTimeMillis();
+    long timeout = membershipManager.getSurpriseMemberTimeout();
+    long birthTime = startTime - timeout + gracePeriod;
+    MembershipManagerHelper.addSurpriseMember(system, member, birthTime);
+    assertThat(membershipManager.isSurpriseMember(member)).as("Member was not a surprise member")
+        .isTrue();
+
+    await("waiting for member to be removed")
+        .atMost((timeout / 3) + gracePeriod, TimeUnit.MILLISECONDS)
+        .until(() -> !membershipManager.isSurpriseMember(member));
+  }
+
+  /**
+   * Tests that a severe-level alert is generated if a member does not respond with an ack quickly
+   * enough. vm0 and vm1 create a region and set ack-severe-alert-threshold. vm1 has a cache
+   * listener in its region that sleeps when notified, forcing the operation to take longer than
+   * ack-wait-threshold + ack-severe-alert-threshold
+   */
+  @Test
+  public void testAckSevereAlertThreshold() throws Exception {
+    // in order to set a small ack-wait-threshold, we have to remove the
+    // system property established by the dunit harness
+    System.clearProperty(GEMFIRE_PREFIX + ACK_WAIT_THRESHOLD);
+
+    Properties config = getDistributedSystemProperties();
+    config.setProperty(MCAST_PORT, "0");
+    config.setProperty(ACK_WAIT_THRESHOLD, "3");
+    config.setProperty(ACK_SEVERE_ALERT_THRESHOLD, "3");
+    config.setProperty(NAME, "putter");
+
+    getSystem(config);
+    Region<String, String> region =
+        new RegionFactory<String, String>().setScope(Scope.DISTRIBUTED_ACK).setEarlyAck(false)
+            .setDataPolicy(DataPolicy.REPLICATE).create("testRegion");
+
+    vm1.invoke("Connect to distributed system", () -> {
+      config.setProperty(NAME, "sleeper");
+      getSystem(config);
+      addIgnoredException("elapsed while waiting for replies");
+
+      RegionFactory<String, String> regionFactory = new RegionFactory<>();
+      Region<String, String> sameRegion =
+          regionFactory.setScope(Scope.DISTRIBUTED_ACK).setDataPolicy(DataPolicy.REPLICATE)
+              .setEarlyAck(false).addCacheListener(getSleepingListener(false)).create("testRegion");
+      myCache = sameRegion.getCache();
+
+      createAlertListener();
+    });
+
+    // now we have two caches set up. vm1 has a listener that will sleep
+    // and cause the severe-alert threshold to be crossed
+
+    region.put("bomb", "pow!"); // this will hang until vm1 responds
+    disconnectAllFromDS();
+
+    vm1.invoke(() -> {
+      assertThat(alertReceived).isTrue();
+    });
+  }
+
+  /**
+   * Tests that a sick member is kicked out
+   */
+  @Test
+  public void testKickOutSickMember() throws Exception {
+    addIgnoredException("10 seconds have elapsed while waiting");
+
+    // in order to set a small ack-wait-threshold, we have to remove the
+    // system property established by the dunit harness
+    System.clearProperty(GEMFIRE_PREFIX + ACK_WAIT_THRESHOLD);
+
+    Properties config = getDistributedSystemProperties();
+    config.setProperty(MCAST_PORT, "0"); // loner
+    config.setProperty(ACK_WAIT_THRESHOLD, "5");
+    config.setProperty(ACK_SEVERE_ALERT_THRESHOLD, "5");
+    config.setProperty(NAME, "putter");
+
+    getSystem(config);
+    Region<String, String> region = new RegionFactory<String, String>()
+        .setScope(Scope.DISTRIBUTED_ACK).setDataPolicy(DataPolicy.REPLICATE).create("testRegion");
+
+    addIgnoredException("sec have elapsed while waiting for replies");
+
+    vm1.invoke(new SerializableRunnable("Connect to distributed system") {
+      @Override
+      public void run() {
+        config.setProperty(NAME, "sleeper");
+        getSystem(config);
+
+        addIgnoredException("service failure");
+        addIgnoredException(ForcedDisconnectException.class.getName());
+
+        RegionFactory<String, String> regionFactory = new RegionFactory<>();
+        Region sameRegion =
+            regionFactory.setScope(Scope.DISTRIBUTED_ACK).setDataPolicy(DataPolicy.REPLICATE)
+                .addCacheListener(getSleepingListener(true)).create("testRegion");
+        myCache = sameRegion.getCache();
+      }
+    });
+
+    // now we have two caches set up, each having an alert listener. Vm1
+    // also has a cache listener that will turn off its ability to respond
+    // to "are you dead" messages and then sleep
+
+    region.put("bomb", "pow!");
+    disconnectFromDS();
+
+    vm1.invoke("wait for forced disconnect", () -> {
+      await("vm1's system should have been disconnected").atMost(1, MINUTES)
+          .until(() -> assertThat(basicGetSystem().isConnected()).isFalse());
+
+      await("vm1's cache is not closed").atMost(30, SECONDS)
+          .until(() -> assertThat(myCache.isClosed()).isTrue());
+
+      await("vm1's listener should have received afterRegionDestroyed notification")
+          .atMost(30, SECONDS).until(() -> assertThat(regionDestroyedInvoked).isTrue());
+    });
+  }
+
+  /**
+   * test use of a bad bind-address for bug #32565
+   */
+  @Test
+  public void testBadBindAddress() throws Exception {
+    Properties config = getDistributedSystemProperties();
+    config.setProperty(MCAST_PORT, "0"); // loner
+    config.setProperty(ACK_WAIT_THRESHOLD, "5");
+    config.setProperty(ACK_SEVERE_ALERT_THRESHOLD, "5");
+
+    // use a valid address that's not proper for this machine
+    config.setProperty(BIND_ADDRESS, "www.yahoo.com");
+    assertThatThrownBy(() -> getSystem(config)).isInstanceOf(IllegalArgumentException.class);
+
+    // use an invalid address
+    config.setProperty(BIND_ADDRESS, "bruce.schuchardt");
+    assertThatThrownBy(() -> getSystem(config)).isInstanceOf(IllegalArgumentException.class);
+
+    // use a valid bind address
+    config.setProperty(BIND_ADDRESS, InetAddress.getLocalHost().getCanonicalHostName());
+    assertThatCode(() -> getSystem()).doesNotThrowAnyException();
+  }
+
+  /**
+   * install a new view and show that waitForViewInstallation works as expected
+   */
+  @Test
+  public void testWaitForViewInstallation() {
+    InternalDistributedSystem system = getSystem(new Properties());
+    ClusterDistributionManager dm = (ClusterDistributionManager) system.getDM();
+    GMSMembershipManager membershipManager = (GMSMembershipManager) dm.getMembershipManager();
+    NetView view = membershipManager.getView();
+
+    AtomicBoolean waitForViewInstallationDone = new AtomicBoolean();
+    executorService.submit(() -> {
+      try {
+        dm.waitForViewInstallation(view.getViewId() + 1);
+        waitForViewInstallationDone.set(true);
+      } catch (InterruptedException e) {
+        errorCollector.addError(e);
+      }
+    });
+
+    pause(2000);
+
+    NetView newView = new NetView(view, view.getViewId() + 1);
+    membershipManager.installView(newView);
+
+    await().atMost(30, SECONDS).until(() -> assertThat(waitForViewInstallationDone.get()).isTrue());
+  }
+
+  private CacheListener<String, String> getSleepingListener(final boolean playDead) {
+    regionDestroyedInvoked = false;
+
+    return new CacheListenerAdapter<String, String>() {
+      @Override
+      public void afterCreate(EntryEvent event) {
+        try {
+          if (playDead) {
+            MembershipManagerHelper.beSickMember(getSystemStatic());
+            MembershipManagerHelper.playDead(getSystemStatic());
+          }
+          Thread.sleep(15 * 1000);
+        } catch (InterruptedException ie) {
+          errorCollector.addError(ie);
+        }
+      }
+
+      @Override
+      public void afterRegionDestroy(RegionEvent event) {
+        LogWriter logWriter = myCache.getLogger();
+        logWriter.info("afterRegionDestroyed invoked in sleeping listener");
+        logWriter.info("<ExpectedException action=remove>service failure</ExpectedException>");
+        logWriter.info(
+            "<ExpectedException action=remove>org.apache.geode.ForcedDisconnectException</ExpectedException>");
+        regionDestroyedInvoked = true;
+      }
+    };
+  }
+
+  private void createAlertListener() throws Exception {
+    DistributedSystemConfig config =
+        AdminDistributedSystemFactory.defineDistributedSystem(getSystemStatic(), null);
+    AdminDistributedSystem adminSystem = AdminDistributedSystemFactory.getDistributedSystem(config);
+    adminSystem.setAlertLevel(AlertLevel.SEVERE);
+    adminSystem.addAlertListener(alert -> {
+      try {
+        logger.info("alert listener invoked for alert originating in " + alert.getConnectionName());
+        logger.info("  alert text = " + alert.getMessage());
+        logger.info("  systemMember = " + alert.getSystemMember());
+      } catch (Exception e) {
+        errorCollector.addError(e);
+      }
+      alertReceived = true;
+    });
+    adminSystem.connect();
+    assertTrue(adminSystem.waitToBeConnected(5 * 1000));
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/ClusterDistributionManagerForAdminDUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/ClusterDistributionManagerForAdminDUnitTest.java
new file mode 100644
index 0000000..37191ff
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/ClusterDistributionManagerForAdminDUnitTest.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed.internal;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.distributed.ConfigurationProperties.STATISTIC_SAMPLING_ENABLED;
+import static org.apache.geode.test.dunit.LogWriterUtils.getLogWriter;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import java.net.InetAddress;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.CacheException;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.distributed.DistributedLockService;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.Config;
+import org.apache.geode.internal.admin.Alert;
+import org.apache.geode.internal.admin.AlertListener;
+import org.apache.geode.internal.admin.ApplicationVM;
+import org.apache.geode.internal.admin.DLockInfo;
+import org.apache.geode.internal.admin.EntryValueNode;
+import org.apache.geode.internal.admin.GemFireVM;
+import org.apache.geode.internal.admin.GfManagerAgent;
+import org.apache.geode.internal.admin.GfManagerAgentConfig;
+import org.apache.geode.internal.admin.GfManagerAgentFactory;
+import org.apache.geode.internal.admin.StatResource;
+import org.apache.geode.internal.admin.remote.RemoteTransportConfig;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.cache.CacheTestCase;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.categories.FlakyTest;
+
+/**
+ * This class tests the functionality of the {@linkplain org.apache.geode.internal.admin internal
+ * admin} API.
+ */
+@Category(DistributedTest.class)
+public class ClusterDistributionManagerForAdminDUnitTest extends CacheTestCase
+    implements AlertListener {
+
+  private static Logger logger = LogService.getLogger();
+
+  private transient GfManagerAgent agent;
+
+  @Before
+  public void setUp() throws Exception {
+    IgnoredException.addIgnoredException("Error occurred while reading system log");
+
+    ClusterDistributionManager.setIsDedicatedAdminVM(true);
+
+    populateCache();
+
+    boolean created = !isConnectedToDS();
+    InternalDistributedSystem ds = getSystem();
+    RemoteTransportConfig transport =
+        new RemoteTransportConfig(ds.getConfig(), ClusterDistributionManager.ADMIN_ONLY_DM_TYPE);
+    if (created) {
+      disconnectFromDS();
+    }
+
+    // create a GfManagerAgent in the master vm.
+    this.agent = GfManagerAgentFactory.getManagerAgent(
+        new GfManagerAgentConfig(null, transport, getLogWriter(), Alert.SEVERE, this, null));
+
+    await().atMost(1, MINUTES).until(() -> assertThat(agent.isConnected()).isTrue());
+  }
+
+  @After
+  public void preTearDownCacheTestCase() throws Exception {
+    try {
+      if (this.agent != null) {
+        this.agent.disconnect();
+      }
+      disconnectFromDS();
+    } finally {
+      ClusterDistributionManager.setIsDedicatedAdminVM(false);
+    }
+  }
+
+  @Test
+  public void testGetDistributionVMType() {
+    DistributionManager dm = this.agent.getDM();
+    assertThat(dm.getId().getVmKind()).isEqualTo(ClusterDistributionManager.ADMIN_ONLY_DM_TYPE);
+  }
+
+  @Test
+  public void testAgent() {
+    assertThat(agent.listPeers()).hasSize(0);
+    assertThat(agent.isConnected()).isTrue();
+    agent.disconnect();
+    assertThat(agent.isConnected()).isFalse();
+  }
+
+  @Category(FlakyTest.class) // GEODE-1688
+  @Test
+  public void testApplications() throws Exception {
+    await().atMost(1, MINUTES)
+        .until(() -> assertThat(agent.listApplications().length).isGreaterThanOrEqualTo(4));
+
+    ApplicationVM[] applications = agent.listApplications();
+    for (int whichApplication = 0; whichApplication < applications.length; whichApplication++) {
+
+      InetAddress host = applications[whichApplication].getHost();
+      String appHostName = host.getHostName();
+
+      assertThat(host).isEqualTo(InetAddress.getByName(appHostName));
+
+      StatResource[] stats = applications[whichApplication].getStats(null);
+      assertThat(stats.length).isGreaterThan(0);
+
+      Config config = applications[whichApplication].getConfig();
+      String[] attributeNames = config.getAttributeNames();
+      boolean foundStatisticSamplingEnabled = false;
+      for (String attributeName : attributeNames) {
+        if (attributeName.equals(STATISTIC_SAMPLING_ENABLED)) {
+          foundStatisticSamplingEnabled = true;
+          assertThat(config.getAttribute(attributeName)).isEqualTo("true");
+          break;
+        }
+      }
+      assertThat(foundStatisticSamplingEnabled).isTrue();
+
+      String[] logs = applications[whichApplication].getSystemLogs();
+      assertThat(logs.length).isGreaterThan(0);
+
+      VM vm = findVMForAdminObject(applications[whichApplication]);
+      assertThat(vm).isNotNull();
+
+      String lockName = "cdm_testlock" + whichApplication;
+      assertThat(acquireDistributedLock(vm, lockName)).isTrue();
+
+      DLockInfo[] locks = applications[whichApplication].getDistributedLockInfo();
+      assertThat(locks.length).isGreaterThan(0);
+
+      boolean foundLock = false;
+      for (DLockInfo lock : locks) {
+        if (lock.getLockName().equals(lockName)) {
+          foundLock = true;
+          assertThat(lock.isAcquired()).isTrue();
+        }
+      }
+      assertThat(foundLock).isTrue();
+
+      Region[] roots = applications[whichApplication].getRootRegions();
+      assertThat(roots.length).isGreaterThan(0);
+
+      Region root = roots[0];
+      assertThat(root).isNotNull();
+      assertThat(root.getName()).isEqualTo("root");
+      assertThat(root.getFullPath()).isEqualTo("/root");
+
+      RegionAttributes attributes = root.getAttributes();
+      assertThat(attributes).isNotNull();
+      if (attributes.getStatisticsEnabled()) {
+        assertThat(root.getStatistics()).isNotNull();
+      }
+
+      Set subregions = root.subregions(false);
+      assertThat(subregions).hasSize(3);
+      assertThat(root.keySet()).hasSize(2);
+
+      Region.Entry entry = root.getEntry("cacheObj1");
+      assertThat(entry).isNotNull();
+      if (attributes.getStatisticsEnabled()) {
+        assertThat(entry.getStatistics()).isNotNull();
+      }
+      assertThat(entry.getValue()).isEqualTo("null");
+
+      /// test lightweight inspection;
+      entry = root.getEntry("cacheObj2");
+      assertThat(entry).isNotNull();
+
+      Object val = entry.getValue();
+      assertThat(val).isInstanceOf(String.class);
+      assertThat(((String) val)).contains("java.lang.StringBuffer");
+
+      /// test physical inspection
+      applications[whichApplication].setCacheInspectionMode(GemFireVM.PHYSICAL_CACHE_VALUE);
+      entry = root.getEntry("cacheObj2");
+      assertThat(entry).isNotNull();
+
+      val = entry.getValue();
+      assertThat(val).isInstanceOf(EntryValueNode.class);
+
+      EntryValueNode node = (EntryValueNode) val;
+      String type = node.getType();
+      assertThat(type).contains("java.lang.StringBuffer");
+      assertThat(node.isPrimitiveOrString()).isFalse();
+
+      EntryValueNode[] fields = node.getChildren();
+      assertThat(fields).isNotNull();
+
+      getLogWriter().warning(
+          "The tests use StringBuffers for values which might be implemented differently in jdk 1.5");
+
+      /// test destruction in the last valid app
+      int lastApplication = applications.length - 1;
+      if (whichApplication == lastApplication) {
+        int expectedSize = subregions.size() - 1;
+        Region subRegion = (Region) subregions.iterator().next();
+        Region rootRegion = root;
+        subRegion.destroyRegion();
+
+        await().atMost(30, SECONDS)
+            .until(() -> assertThat(rootRegion.subregions(false)).hasSize(expectedSize));
+      }
+    }
+  }
+
+  @Override
+  public void alert(Alert alert) {
+    getLogWriter().info("DEBUG: alert=" + alert);
+  }
+
+  private void populateCache() {
+    AttributesFactory attributesFactory = new AttributesFactory();
+    attributesFactory.setScope(Scope.DISTRIBUTED_NO_ACK);
+
+    RegionAttributes regionAttributes = attributesFactory.create();
+
+    for (int i = 0; i < Host.getHostCount(); i++) {
+      Host host = Host.getHost(i);
+
+      for (int j = 0; j < host.getVMCount(); j++) {
+        VM vm = host.getVM(j);
+        vm.invoke(() -> {
+          createRegion("cdm-testSubRegion1", regionAttributes);
+          createRegion("cdm-testSubRegion2", regionAttributes);
+          createRegion("cdm-testSubRegion3", regionAttributes);
+          remoteCreateEntry("", "cacheObj1", null);
+          StringBuffer val = new StringBuffer("userDefValue1");
+          remoteCreateEntry("", "cacheObj2", val);
+        });
+      }
+    }
+  }
+
+  /**
+   * Puts (or creates) a value in a region named <code>regionName</code> named
+   * <code>entryName</code>.
+   */
+  private void remoteCreateEntry(String regionName, String entryName, Object value)
+      throws CacheException {
+
+    Region root = getRootRegion();
+    Region region = root.getSubregion(regionName);
+    region.create(entryName, value);
+
+    logger.info("Put value " + value + " in entry " + entryName + " in region '"
+        + region.getFullPath() + "'");
+  }
+
+  private boolean acquireDistributedLock(VM vm, String lockName) {
+    return vm.invoke(() -> remoteAcquireDistLock(lockName));
+  }
+
+  private boolean remoteAcquireDistLock(String lockName) {
+    String serviceName = "cdmtest_service";
+    DistributedLockService service = DistributedLockService.getServiceNamed(serviceName);
+    if (service == null) {
+      service =
+          DistributedLockService.create(serviceName, InternalDistributedSystem.getAnyInstance());
+    }
+    assertThat(service).isNotNull();
+    return service.lock(lockName, 1000, 3000);
+  }
+
+  private VM findVMForAdminObject(GemFireVM gemFireVM) {
+    for (int i = 0; i < Host.getHostCount(); i++) {
+      Host host = Host.getHost(i);
+      for (int j = 0; j < host.getVMCount(); j++) {
+        VM vm = host.getVM(j);
+        InternalDistributedMember member = getJavaGroupsIdForVM(vm);
+        if (gemFireVM.getId().equals(member)) {
+          return vm;
+        }
+      }
+    }
+    return null;
+  }
+
+  private InternalDistributedMember getJavaGroupsIdForVM(VM vm) {
+    return vm.invoke(() -> remoteGetJavaGroupsIdForVM());
+  }
+
+  private InternalDistributedMember remoteGetJavaGroupsIdForVM() {
+    InternalDistributedSystem system = InternalDistributedSystem.getAnyInstance();
+    return system.getDistributionManager().getDistributionManagerId();
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionManagerTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/ClusterDistributionManagerTest.java
similarity index 97%
rename from geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionManagerTest.java
rename to geode-core/src/test/java/org/apache/geode/distributed/internal/ClusterDistributionManagerTest.java
index 6050d68..a941cfc 100644
--- a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionManagerTest.java
+++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/ClusterDistributionManagerTest.java
@@ -29,15 +29,17 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe
 import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
-public class DistributionManagerTest {
+public class ClusterDistributionManagerTest {
 
   @Test
   public void shouldBeMockable() throws Exception {
     ClusterDistributionManager mockDistributionManager = mock(ClusterDistributionManager.class);
     InternalDistributedMember mockInternalDistributedMember = mock(InternalDistributedMember.class);
     Executor mockExecutor = mock(Executor.class);
+
     when(mockDistributionManager.getExecutor(anyInt(), eq(mockInternalDistributedMember)))
         .thenReturn(mockExecutor);
+
     assertThat(mockDistributionManager.getExecutor(1, mockInternalDistributedMember))
         .isSameAs(mockExecutor);
   }
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/ConsoleDistributionManagerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/ConsoleDistributionManagerDUnitTest.java
deleted file mode 100644
index 8e100a7..0000000
--- a/geode-core/src/test/java/org/apache/geode/distributed/internal/ConsoleDistributionManagerDUnitTest.java
+++ /dev/null
@@ -1,446 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.distributed.internal;
-
-import static org.apache.geode.distributed.ConfigurationProperties.*;
-import static org.junit.Assert.*;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Set;
-
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.SystemFailure;
-import org.apache.geode.cache.AttributesFactory;
-import org.apache.geode.cache.CacheException;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionAttributes;
-import org.apache.geode.cache.Scope;
-import org.apache.geode.distributed.DistributedLockService;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.Config;
-import org.apache.geode.internal.admin.Alert;
-import org.apache.geode.internal.admin.AlertListener;
-import org.apache.geode.internal.admin.ApplicationVM;
-import org.apache.geode.internal.admin.DLockInfo;
-import org.apache.geode.internal.admin.EntryValueNode;
-import org.apache.geode.internal.admin.GemFireVM;
-import org.apache.geode.internal.admin.GfManagerAgent;
-import org.apache.geode.internal.admin.GfManagerAgentConfig;
-import org.apache.geode.internal.admin.GfManagerAgentFactory;
-import org.apache.geode.internal.admin.StatResource;
-import org.apache.geode.internal.admin.remote.RemoteTransportConfig;
-import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.IgnoredException;
-import org.apache.geode.test.dunit.LogWriterUtils;
-import org.apache.geode.test.dunit.SerializableRunnable;
-import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.categories.FlakyTest;
-
-/**
- * This class tests the functionality of the {@linkplain org.apache.geode.internal.admin internal
- * admin} API.
- */
-@Category(DistributedTest.class)
-public class ConsoleDistributionManagerDUnitTest extends JUnit4CacheTestCase
-    implements AlertListener {
-
-  protected GfManagerAgent agent = null;
-  private static boolean firstTime = true;
-
-  public void alert(Alert alert) {
-    LogWriterUtils.getLogWriter().info("DEBUG: alert=" + alert);
-  }
-
-  @Override
-  public final void postSetUp() throws Exception {
-    boolean finishedSetup = false;
-    IgnoredException.addIgnoredException("Error occurred while reading system log");
-    try {
-      if (firstTime) {
-        disconnectFromDS(); // make sure there's no ldm lying around
-        try {
-          Thread.sleep(5 * 1000);
-        } catch (InterruptedException ie) {
-          fail("interrupted");
-        }
-        firstTime = false;
-      }
-
-      ClusterDistributionManager.setIsDedicatedAdminVM(true);
-
-      populateCache();
-
-      RemoteTransportConfig transport = null;
-      {
-        boolean created = !isConnectedToDS();
-        InternalDistributedSystem ds = getSystem();
-        transport = new RemoteTransportConfig(ds.getConfig(),
-            ClusterDistributionManager.ADMIN_ONLY_DM_TYPE);
-        if (created) {
-          disconnectFromDS();
-        }
-      }
-      // create a GfManagerAgent in the master vm.
-      this.agent = GfManagerAgentFactory.getManagerAgent(new GfManagerAgentConfig(null, transport,
-          LogWriterUtils.getLogWriter(), Alert.SEVERE, this, null));
-      if (!agent.isConnected()) {
-        WaitCriterion ev = new WaitCriterion() {
-          public boolean done() {
-            return agent.isConnected();
-          }
-
-          public String description() {
-            return null;
-          }
-        };
-        Wait.waitForCriterion(ev, 60 * 1000, 200, true);
-      }
-      finishedSetup = true;
-    } finally {
-      if (!finishedSetup) {
-        try {
-          this.agent.disconnect();
-        } catch (VirtualMachineError e) {
-          SystemFailure.initiateFailure(e);
-          throw e;
-        } catch (Throwable ignore) {
-        }
-        try {
-          super.preTearDown();
-        } catch (VirtualMachineError e) {
-          SystemFailure.initiateFailure(e);
-          throw e;
-        } catch (Throwable ignore) {
-        }
-        try {
-          disconnectFromDS();
-        } catch (VirtualMachineError e) {
-          SystemFailure.initiateFailure(e);
-          throw e;
-        } catch (Throwable ignore) {
-        }
-        ClusterDistributionManager.setIsDedicatedAdminVM(false);
-      }
-    }
-  }
-
-  @Override
-  public final void preTearDownCacheTestCase() throws Exception {
-    this.agent.disconnect();
-  }
-
-  @Override
-  public final void postTearDownCacheTestCase() throws Exception {
-    try {
-      disconnectFromDS(); // make sure there's no ldm lying around
-    } finally {
-      ClusterDistributionManager.setIsDedicatedAdminVM(false);
-    }
-  }
-
-  @Test
-  public void testGetDistributionVMType() {
-    DistributionManager dm = this.agent.getDM();
-    InternalDistributedMember ipaddr = dm.getId();
-    assertEquals(ClusterDistributionManager.ADMIN_ONLY_DM_TYPE, ipaddr.getVmKind());
-  }
-
-  @Test
-  public void testAgent() {
-    assertEquals("expected empty peer array", 0, agent.listPeers().length);
-    int systemCount = 0;
-    for (int h = 0; h < Host.getHostCount(); h++) {
-      Host host = Host.getHost(h);
-      systemCount += host.getSystemCount();
-    }
-    // note that JoinLeaveListener is not tested since it would require
-    // this test to start and stop systems.
-    agent.disconnect();
-    assertTrue("agent should have been disconnected", !agent.isConnected());
-  }
-
-  @Category(FlakyTest.class) // GEODE-1688
-  @Test
-  public void testApplications() throws Exception {
-    {
-      WaitCriterion ev = new WaitCriterion() {
-        public boolean done() {
-          ApplicationVM[] apps = agent.listApplications();
-          return apps.length >= 4;
-        }
-
-        public String description() {
-          return null;
-        }
-      };
-      Wait.waitForCriterion(ev, 60 * 1000, 200, true);
-    }
-
-    // final Serializable controllerId = getSystem().getDistributionManager().getId(); //can't do
-    // this...
-    ApplicationVM[] apps = agent.listApplications();
-    for (int i = 0; i < apps.length; i++) {
-      // if (apps[i].getId().equals(controllerId)) {
-      // continue; // skip this one; its the locator vm
-      // }
-      InetAddress host = apps[i].getHost();
-      String appHostName = host.getHostName();
-      try {
-        InetAddress appHost = InetAddress.getByName(appHostName);
-        assertEquals(appHost, host);
-      } catch (UnknownHostException ex) {
-        fail("Lookup of address for host " + appHostName + " failed because " + ex);
-      }
-
-      StatResource[] stats = apps[i].getStats(null);
-      assertTrue(stats.length > 0);
-
-      Config conf = apps[i].getConfig();
-      String[] attNames = conf.getAttributeNames();
-      boolean foundIt = false;
-      for (int j = 0; j < attNames.length; j++) {
-        if (attNames[j].equals(STATISTIC_SAMPLING_ENABLED)) {
-          foundIt = true;
-          assertEquals(conf.getAttribute(attNames[j]), "true");
-          break;
-        }
-      }
-      assertTrue(foundIt);
-
-      String[] logs = apps[i].getSystemLogs();
-      assertTrue(logs.length > 0);
-      // assertTrue(logs[0].length() > 0);
-
-      {
-        VM vm = findVMForAdminObject(apps[i]);
-        // getLogWriter().info("DEBUG: found VM " + vm +
-        // " which corresponds to ApplicationVM "
-        // + apps[i] + " in testApplications");
-        assertNotNull(vm);
-        String lockName = "cdm_testlock" + i;
-        assertTrue(acquireDistLock(vm, lockName));
-        DLockInfo[] locks = apps[i].getDistributedLockInfo();
-        assertTrue(locks.length > 0);
-        boolean foundLock = false;
-        for (int j = 0; j < locks.length; j++) {
-          if (locks[j].getLockName().equals(lockName)) {
-            foundLock = true;
-            assertTrue(locks[j].isAcquired());
-          }
-        }
-        assertTrue(foundLock);
-      }
-
-      Region[] roots = apps[i].getRootRegions();
-      if (roots.length == 0) {
-        LogWriterUtils.getLogWriter().info(
-            "DEBUG: testApplications: apps[" + i + "]=" + apps[i] + " did not have a root region");
-      } else {
-        Region root = roots[0];
-        assertNotNull(root);
-        assertEquals("root", root.getName());
-        assertEquals("/root", root.getFullPath());
-        RegionAttributes attributes = root.getAttributes();
-        assertNotNull(attributes);
-        if (attributes.getStatisticsEnabled()) {
-          assertNotNull(root.getStatistics());
-        }
-        final Set subregions = root.subregions(false);
-        assertEquals(3, subregions.size());
-        assertEquals(2, root.keySet().size());
-        Region.Entry entry = root.getEntry("cacheObj1");
-        assertNotNull(entry);
-        if (attributes.getStatisticsEnabled()) {
-          assertNotNull(entry.getStatistics());
-        }
-        assertTrue(entry.getValue().equals("null"));
-
-        /// test lightweight inspection;
-        entry = root.getEntry("cacheObj2");
-        assertNotNull(entry);
-        Object val = entry.getValue();
-        assertTrue(val instanceof String);
-        assertTrue(((String) val).indexOf("java.lang.StringBuffer") != -1);
-
-        /// test physical inspection
-        // getLogWriter().info("DEBUG: Starting test of physical cache value inspection");
-        apps[i].setCacheInspectionMode(GemFireVM.PHYSICAL_CACHE_VALUE);
-        entry = root.getEntry("cacheObj2");
-        assertNotNull(entry);
-        val = entry.getValue();
-        assertTrue(val instanceof EntryValueNode);
-        EntryValueNode node = (EntryValueNode) val;
-        String type = node.getType();
-        assertTrue(type.indexOf("java.lang.StringBuffer") != -1);
-        assertTrue(!node.isPrimitiveOrString());
-        EntryValueNode[] fields = node.getChildren();
-        assertNotNull(fields);
-        LogWriterUtils.getLogWriter().warning(
-            "The tests use StringBuffers for values which might be implmented differently in jdk 1.5");
-        // assertTrue(fields.length > 0);
-
-        /// test destruction in the last valid app
-        int lastIdx = apps.length - 1;
-
-        /*
-         * if (i == lastIdx || (i == lastIdx-1 && apps[lastIdx].getId().equals(controllerId))) {
-         */
-
-        if (i == lastIdx) {
-          // getLogWriter().info("DEBUG: starting region destroy from admin apis");
-          final int expectedSize = subregions.size() - 1;
-          final Region r = (Region) subregions.iterator().next();
-          final Region rr = root;
-          r.destroyRegion();
-          WaitCriterion ev = new WaitCriterion() {
-            public boolean done() {
-              Set s = rr.subregions(false);
-              return s.size() == expectedSize;
-            }
-
-            public String description() {
-              return "Waited 20 seconds for region " + r.getFullPath() + "to be destroyed.";
-            }
-          };
-          Wait.waitForCriterion(ev, 20 * 1000, 200, true);
-        }
-      }
-    }
-  }
-
-  private void populateCache() {
-
-    AttributesFactory fact = new AttributesFactory();
-    fact.setScope(Scope.DISTRIBUTED_NO_ACK);
-    final RegionAttributes rAttr = fact.create();
-
-    final SerializableRunnable populator = new SerializableRunnable() {
-      public void run() {
-        try {
-          createRegion("cdm-testSubRegion1", rAttr);
-          createRegion("cdm-testSubRegion2", rAttr);
-          createRegion("cdm-testSubRegion3", rAttr);
-          remoteCreateEntry("", "cacheObj1", null);
-          StringBuffer val = new StringBuffer("userDefValue1");
-          remoteCreateEntry("", "cacheObj2", val);
-        } catch (CacheException ce) {
-          fail("Exception while populating cache:\n" + ce);
-        }
-      }
-    };
-
-    for (int h = 0; h < Host.getHostCount(); h++) {
-      Host host = Host.getHost(h);
-
-      for (int v = 0; v < host.getVMCount(); v++) {
-        VM vm = host.getVM(v);
-        vm.invoke(populator);
-      }
-    }
-  }
-
-
-
-  /**
-   * Puts (or creates) a value in a region named <code>regionName</code> named
-   * <code>entryName</code>.
-   */
-  protected void remoteCreateEntry(String regionName, String entryName, Object value)
-      throws CacheException {
-
-    Region root = getRootRegion();
-    Region region = root.getSubregion(regionName);
-    region.create(entryName, value);
-
-
-    LogWriterUtils.getLogWriter().info("Put value " + value + " in entry " + entryName
-        + " in region '" + region.getFullPath() + "'");
-
-  }
-
-  // private long getConId(VM vm) {
-  // return vm.invoke(() -> this.remoteGetConId());
-  // }
-  /**
-   * Accessed via reflection. DO NOT REMOVE
-   *
-   * @return
-   */
-  protected static long remoteGetConId() {
-    return InternalDistributedSystem.getAnyInstance().getId();
-  }
-
-  private boolean acquireDistLock(VM vm, String lockName) {
-    return vm.invoke(() -> remoteAcquireDistLock(lockName));
-  }
-
-  /**
-   * Accessed via reflection. DO NOT REMOVE
-   *
-   * @param lockName
-   * @return
-   */
-  protected static boolean remoteAcquireDistLock(String lockName) {
-    String serviceName = "cdmtest_service";
-    DistributedLockService service = DistributedLockService.getServiceNamed(serviceName);
-    if (service == null) {
-      service =
-          DistributedLockService.create(serviceName, InternalDistributedSystem.getAnyInstance());
-    }
-    assertNotNull(service);
-    try {
-      return service.lock(lockName, 1000, 3000);
-    } catch (Exception e) {
-      throw new RuntimeException("DEBUG: remoteAcquireDistLock", e);
-      // return false;
-    }
-  }
-
-  private VM findVMForAdminObject(GemFireVM adminObj) {
-    for (int i = 0; i < Host.getHostCount(); i++) {
-      Host host = Host.getHost(i);
-      for (int j = 0; j < host.getVMCount(); j++) {
-        VM vm = host.getVM(j);
-        InternalDistributedMember id = getJavaGroupsIdForVM(vm);
-        if (adminObj.getId().equals(id)) {
-          return vm;
-        }
-      }
-    }
-    return null;
-  }
-
-  private InternalDistributedMember getJavaGroupsIdForVM(VM vm) {
-    return (InternalDistributedMember) vm.invoke(() -> remoteGetJavaGroupsIdForVM());
-  }
-
-  /**
-   * INVOKED VIA REFLECTION
-   *
-   * @return
-   */
-  protected static InternalDistributedMember remoteGetJavaGroupsIdForVM() {
-    InternalDistributedSystem sys = InternalDistributedSystem.getAnyInstance();
-    return sys.getDistributionManager().getDistributionManagerId();
-
-  }
-}
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionManagerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionManagerDUnitTest.java
deleted file mode 100644
index 6f8a421..0000000
--- a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionManagerDUnitTest.java
+++ /dev/null
@@ -1,545 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.distributed.internal;
-
-import static org.apache.geode.distributed.ConfigurationProperties.*;
-import static org.apache.geode.test.dunit.Assert.*;
-
-import java.net.InetAddress;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.logging.log4j.Logger;
-import org.awaitility.Awaitility;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.LogWriter;
-import org.apache.geode.admin.AdminDistributedSystem;
-import org.apache.geode.admin.AdminDistributedSystemFactory;
-import org.apache.geode.admin.Alert;
-import org.apache.geode.admin.AlertLevel;
-import org.apache.geode.admin.AlertListener;
-import org.apache.geode.admin.DistributedSystemConfig;
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheListener;
-import org.apache.geode.cache.DataPolicy;
-import org.apache.geode.cache.EntryEvent;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionEvent;
-import org.apache.geode.cache.RegionFactory;
-import org.apache.geode.cache.Scope;
-import org.apache.geode.cache.util.CacheListenerAdapter;
-import org.apache.geode.distributed.DistributedSystem;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.distributed.internal.membership.MembershipManager;
-import org.apache.geode.distributed.internal.membership.NetView;
-import org.apache.geode.distributed.internal.membership.gms.MembershipManagerHelper;
-import org.apache.geode.distributed.internal.membership.gms.interfaces.Manager;
-import org.apache.geode.distributed.internal.membership.gms.mgr.GMSMembershipManager;
-import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.IgnoredException;
-import org.apache.geode.test.dunit.NetworkUtils;
-import org.apache.geode.test.dunit.SerializableRunnable;
-import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
-import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
-import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.categories.MembershipTest;
-
-/**
- * This class tests the functionality of the {@link ClusterDistributionManager} class.
- */
-@Category({DistributedTest.class, MembershipTest.class})
-public class DistributionManagerDUnitTest extends JUnit4DistributedTestCase {
-  private static final Logger logger = LogService.getLogger();
-
-  public static DistributedSystem ds;
-
-  @Rule
-  public DistributedRestoreSystemProperties restoreSystemProperties =
-      new DistributedRestoreSystemProperties();
-
-  /**
-   * Clears the exceptionInThread flag in the given distribution manager.
-   */
-  public static void clearExceptionInThreads(ClusterDistributionManager dm) {
-    dm.clearExceptionInThreads();
-  }
-
-  @Override
-  public void preSetUp() throws Exception {
-    disconnectAllFromDS();
-  }
-
-  protected static class ItsOkayForMyClassNotToBeFound extends SerialDistributionMessage {
-
-    public int getDSFID() {
-      return NO_FIXED_ID;
-    }
-
-    @Override
-    protected void process(ClusterDistributionManager dm) {
-      // We should never get here
-    }
-  };
-
-  @Test
-  public void testGetDistributionVMType() {
-    DistributionManager dm = getSystem().getDistributionManager();
-    InternalDistributedMember ipaddr = dm.getId();
-    assertEquals(ClusterDistributionManager.NORMAL_DM_TYPE, ipaddr.getVmKind());
-  }
-
-  /**
-   * Send the distribution manager a message it can't deserialize
-   */
-  @Ignore
-  @Test
-  public void testExceptionInThreads() throws InterruptedException {
-    ClusterDistributionManager dm =
-        (ClusterDistributionManager) getSystem().getDistributionManager();
-    String p1 = "ItsOkayForMyClassNotToBeFound";
-    logger.info("<ExpectedException action=add>" + p1 + "</ExpectedException>");
-    DistributionMessage m = new ItsOkayForMyClassNotToBeFound();
-    dm.putOutgoing(m);
-    Thread.sleep(1 * 1000);
-    logger.info("<ExpectedException action=remove>" + p1 + "</ExpectedException>");
-    Awaitility.await("waiting for exceptionInThreads to be true").atMost(15, TimeUnit.SECONDS)
-        .until(() -> {
-          return dm.exceptionInThreads();
-        });
-    dm.clearExceptionInThreads();
-    assertTrue(!dm.exceptionInThreads());
-  }
-
-  /**
-   * Demonstrate that a new UDP port is used when an attempt is made to reconnect using a shunned
-   * port
-   */
-  @Test
-  public void testConnectAfterBeingShunned() {
-    InternalDistributedSystem sys = getSystem();
-    MembershipManager mgr = MembershipManagerHelper.getMembershipManager(sys);
-    InternalDistributedMember idm = mgr.getLocalMember();
-    // TODO GMS needs to have a system property allowing the bind-port to be set
-    System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "jg-bind-port", "" + idm.getPort());
-    sys.disconnect();
-    sys = getSystem();
-    mgr = MembershipManagerHelper.getMembershipManager(sys);
-    sys.disconnect();
-    InternalDistributedMember idm2 = mgr.getLocalMember();
-    org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
-        .info("original ID=" + idm + " and after connecting=" + idm2);
-    assertTrue("should not have used a different udp port", idm.getPort() == idm2.getPort());
-  }
-
-  /**
-   * Test the handling of "surprise members" in the membership manager. Create a DistributedSystem
-   * in this VM and then add a fake member to its surpriseMember set. Then ensure that it stays in
-   * the set when a new membership view arrives that doesn't contain it. Then wait until the member
-   * should be gone and force more view processing to have it scrubbed from the set.
-   **/
-  @Test
-  public void testSurpriseMemberHandling() throws Exception {
-
-    System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "surprise-member-timeout", "3000");
-    InternalDistributedSystem sys = getSystem();
-    MembershipManager mgr = MembershipManagerHelper.getMembershipManager(sys);
-    assertTrue(((GMSMembershipManager) mgr).isCleanupTimerStarted());
-
-    try {
-      InternalDistributedMember mbr =
-          new InternalDistributedMember(NetworkUtils.getIPLiteral(), 12345);
-
-      // first make sure we can't add this as a surprise member (bug #44566)
-
-      // if the view number isn't being recorded correctly the test will pass but the
-      // functionality is broken
-      Assert.assertTrue("expected view ID to be greater than zero", mgr.getView().getViewId() > 0);
-
-      int oldViewId = mbr.getVmViewId();
-      mbr.setVmViewId((int) mgr.getView().getViewId() - 1);
-      org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
-          .info("current membership view is " + mgr.getView());
-      org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
-          .info("created ID " + mbr + " with view ID " + mbr.getVmViewId());
-
-      IgnoredException.addIgnoredException("attempt to add old member");
-      IgnoredException.addIgnoredException("Removing shunned GemFire node");
-
-      boolean accepted = mgr.addSurpriseMember(mbr);
-      Assert.assertTrue("member with old ID was not rejected (bug #44566)", !accepted);
-
-      mbr.setVmViewId(oldViewId);
-
-      // now forcibly add it as a surprise member and show that it is reaped
-      long gracePeriod = 5000;
-      long startTime = System.currentTimeMillis();
-      long timeout = ((GMSMembershipManager) mgr).getSurpriseMemberTimeout();
-      long birthTime = startTime - timeout + gracePeriod;
-      MembershipManagerHelper.addSurpriseMember(sys, mbr, birthTime);
-      assertTrue("Member was not a surprise member", mgr.isSurpriseMember(mbr));
-
-      // if (birthTime < (System.currentTimeMillis() - timeout)) {
-      // return; // machine is too busy and we didn't get enough CPU to perform more assertions
-      // }
-
-      Awaitility.await("waiting for member to be removed")
-          .atMost((timeout / 3) + gracePeriod, TimeUnit.MILLISECONDS)
-          .until(() -> !mgr.isSurpriseMember(mbr));
-
-    } finally {
-      if (sys != null && sys.isConnected()) {
-        sys.disconnect();
-      }
-    }
-  }
-
-  /**
-   * vm1 stores its cache in this static variable in testAckSeverAllertThreshold
-   */
-  static Cache myCache;
-
-  /**
-   * Tests that a severe-level alert is generated if a member does not respond with an ack quickly
-   * enough. vm0 and vm1 create a region and set ack-severe-alert-threshold. vm1 has a cache
-   * listener in its region that sleeps when notified, forcing the operation to take longer than
-   * ack-wait-threshold + ack-severe-alert-threshold
-   */
-  @Test
-  public void testAckSevereAlertThreshold() throws Exception {
-    disconnectAllFromDS();
-    Host host = Host.getHost(0);
-    // VM vm0 = host.getVM(0);
-    VM vm1 = host.getVM(1);
-
-    // in order to set a small ack-wait-threshold, we have to remove the
-    // system property established by the dunit harness
-    String oldAckWait = (String) System.getProperties()
-        .remove(DistributionConfig.GEMFIRE_PREFIX + ACK_WAIT_THRESHOLD);
-
-    try {
-      final Properties props = getDistributedSystemProperties();
-      props.setProperty(MCAST_PORT, "0");
-      props.setProperty(ACK_WAIT_THRESHOLD, "3");
-      props.setProperty(ACK_SEVERE_ALERT_THRESHOLD, "3");
-      props.setProperty(NAME, "putter");
-
-      getSystem(props);
-      Region rgn = (new RegionFactory()).setScope(Scope.DISTRIBUTED_ACK).setEarlyAck(false)
-          .setDataPolicy(DataPolicy.REPLICATE).create("testRegion");
-
-      vm1.invoke(new SerializableRunnable("Connect to distributed system") {
-        public void run() {
-          props.setProperty(NAME, "sleeper");
-          getSystem(props);
-          IgnoredException.addIgnoredException("elapsed while waiting for replies");
-          RegionFactory rf = new RegionFactory();
-          Region r = rf.setScope(Scope.DISTRIBUTED_ACK).setDataPolicy(DataPolicy.REPLICATE)
-              .setEarlyAck(false).addCacheListener(getSleepingListener(false)).create("testRegion");
-          myCache = r.getCache();
-          try {
-            createAlertListener();
-          } catch (Exception e) {
-            throw new RuntimeException("failed to create alert listener", e);
-          }
-        }
-      });
-
-      // now we have two caches set up. vm1 has a listener that will sleep
-      // and cause the severe-alert threshold to be crossed
-
-      rgn.put("bomb", "pow!"); // this will hang until vm1 responds
-
-      rgn.getCache().close();
-      basicGetSystem().disconnect();
-
-      vm1.invoke(new SerializableRunnable("disconnect from ds") {
-        public void run() {
-          if (!myCache.isClosed()) {
-            if (basicGetSystem().isConnected()) {
-              basicGetSystem().disconnect();
-            }
-            myCache = null;
-          }
-          if (basicGetSystem().isConnected()) {
-            basicGetSystem().disconnect();
-          }
-          synchronized (alertGuard) {
-            assertTrue(alertReceived);
-          }
-        }
-      });
-
-    } finally {
-      if (oldAckWait != null) {
-        System.setProperty(DistributionConfig.GEMFIRE_PREFIX + ACK_WAIT_THRESHOLD, oldAckWait);
-      }
-    }
-  }
-
-  static volatile boolean regionDestroyedInvoked;
-
-  static CacheListener getSleepingListener(final boolean playDead) {
-    regionDestroyedInvoked = false;
-
-    return new CacheListenerAdapter() {
-      @Override
-      public void afterCreate(EntryEvent event) {
-        try {
-          if (playDead) {
-            MembershipManagerHelper.beSickMember(getSystemStatic());
-            MembershipManagerHelper.playDead(getSystemStatic());
-          }
-          Thread.sleep(15000);
-        } catch (InterruptedException ie) {
-          fail("interrupted", ie);
-        }
-      }
-
-      @Override
-      public void afterRegionDestroy(RegionEvent event) {
-        LogWriter logger = myCache.getLogger();
-        logger.info("afterRegionDestroyed invoked in sleeping listener");
-        logger.info("<ExpectedException action=remove>service failure</ExpectedException>");
-        logger.info(
-            "<ExpectedException action=remove>org.apache.geode.ForcedDisconnectException</ExpectedException>");
-        regionDestroyedInvoked = true;
-      }
-    };
-  }
-
-  static AdminDistributedSystem adminSystem;
-  static Object alertGuard = new Object();
-  static boolean alertReceived;
-
-  static void createAlertListener() throws Exception {
-    DistributedSystemConfig config =
-        AdminDistributedSystemFactory.defineDistributedSystem(getSystemStatic(), null);
-    adminSystem = AdminDistributedSystemFactory.getDistributedSystem(config);
-    adminSystem.setAlertLevel(AlertLevel.SEVERE);
-    adminSystem.addAlertListener(new AlertListener() {
-      public void alert(Alert alert) {
-        try {
-          logger
-              .info("alert listener invoked for alert originating in " + alert.getConnectionName());
-          logger.info("  alert text = " + alert.getMessage());
-          logger.info("  systemMember = " + alert.getSystemMember());
-        } catch (Exception e) {
-          logger.fatal("exception trying to use alert object", e);
-        }
-        synchronized (alertGuard) {
-          alertReceived = true;
-        }
-      }
-    });
-    adminSystem.connect();
-    assertTrue(adminSystem.waitToBeConnected(5 * 1000));
-  }
-
-  /**
-   * Tests that a sick member is kicked out
-   */
-  @Test
-  public void testKickOutSickMember() throws Exception {
-    disconnectAllFromDS();
-    IgnoredException.addIgnoredException("10 seconds have elapsed while waiting");
-    Host host = Host.getHost(0);
-    // VM vm0 = host.getVM(0);
-    VM vm1 = host.getVM(1);
-
-    // in order to set a small ack-wait-threshold, we have to remove the
-    // system property established by the dunit harness
-    String oldAckWait = (String) System.getProperties()
-        .remove(DistributionConfig.GEMFIRE_PREFIX + ACK_WAIT_THRESHOLD);
-
-    try {
-      final Properties props = getDistributedSystemProperties();
-      props.setProperty(MCAST_PORT, "0"); // loner
-      props.setProperty(ACK_WAIT_THRESHOLD, "5");
-      props.setProperty(ACK_SEVERE_ALERT_THRESHOLD, "5");
-      props.setProperty(NAME, "putter");
-
-      getSystem(props);
-      Region rgn = (new RegionFactory()).setScope(Scope.DISTRIBUTED_ACK)
-          .setDataPolicy(DataPolicy.REPLICATE).create("testRegion");
-      basicGetSystem().getLogWriter().info(
-          "<ExpectedException action=add>sec have elapsed while waiting for replies</ExpectedException>");
-
-      vm1.invoke(new SerializableRunnable("Connect to distributed system") {
-        public void run() {
-          props.setProperty(NAME, "sleeper");
-          getSystem(props);
-          LogWriter log = basicGetSystem().getLogWriter();
-          log.info("<ExpectedException action=add>service failure</ExpectedException>");
-          log.info(
-              "<ExpectedException action=add>org.apache.geode.ForcedDisconnectException</ExpectedException>");
-          RegionFactory rf = new RegionFactory();
-          Region r = rf.setScope(Scope.DISTRIBUTED_ACK).setDataPolicy(DataPolicy.REPLICATE)
-              .addCacheListener(getSleepingListener(true)).create("testRegion");
-          myCache = r.getCache();
-        }
-      });
-
-      // now we have two caches set up, each having an alert listener. Vm1
-      // also has a cache listener that will turn off its ability to respond
-      // to "are you dead" messages and then sleep
-
-      rgn.put("bomb", "pow!");
-
-
-      rgn.getCache().close();
-      basicGetSystem().getLogWriter().info(
-          "<ExpectedException action=remove>sec have elapsed while waiting for replies</ExpectedException>");
-      basicGetSystem().disconnect();
-
-      vm1.invoke(new SerializableRunnable("wait for forced disconnect") {
-        public void run() {
-          // wait a while for the DS to finish disconnecting
-          WaitCriterion ev = new WaitCriterion() {
-            public boolean done() {
-              return !basicGetSystem().isConnected();
-            }
-
-            public String description() {
-              return null;
-            }
-          };
-          // if this fails it means the sick member wasn't kicked out and something is wrong
-          Wait.waitForCriterion(ev, 60 * 1000, 200, true);
-
-          ev = new WaitCriterion() {
-            public boolean done() {
-              return myCache.isClosed();
-            }
-
-            public String description() {
-              return null;
-            }
-          };
-          Wait.waitForCriterion(ev, 20 * 1000, 200, false);
-
-          if (!myCache.isClosed()) {
-            if (basicGetSystem().isConnected()) {
-              basicGetSystem().disconnect();
-            }
-            myCache = null;
-            throw new RuntimeException("Test Failed - vm1's cache is not closed");
-          }
-          if (basicGetSystem().isConnected()) {
-            basicGetSystem().disconnect();
-            throw new RuntimeException("Test Failed - vm1's system should have been disconnected");
-          }
-
-          WaitCriterion wc = new WaitCriterion() {
-            public boolean done() {
-              return regionDestroyedInvoked;
-            }
-
-            public String description() {
-              return "vm1's listener should have received afterRegionDestroyed notification";
-            }
-          };
-          Wait.waitForCriterion(wc, 30 * 1000, 1000, true);
-
-        }
-      });
-
-    } finally {
-      if (oldAckWait != null) {
-        System.setProperty(DistributionConfig.GEMFIRE_PREFIX + ACK_WAIT_THRESHOLD, oldAckWait);
-      }
-    }
-  }
-
-  /**
-   * test use of a bad bind-address for bug #32565
-   */
-  @Test
-  public void testBadBindAddress() throws Exception {
-    disconnectAllFromDS();
-
-    final Properties props = getDistributedSystemProperties();
-    props.setProperty(MCAST_PORT, "0"); // loner
-    // use a valid address that's not proper for this machine
-    props.setProperty(BIND_ADDRESS, "www.yahoo.com");
-    props.setProperty(ACK_WAIT_THRESHOLD, "5");
-    props.setProperty(ACK_SEVERE_ALERT_THRESHOLD, "5");
-    try {
-      getSystem(props);
-    } catch (IllegalArgumentException e) {
-      org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
-          .info("caught expected exception (1)", e);
-    }
-    // use an invalid address
-    props.setProperty(BIND_ADDRESS, "bruce.schuchardt");
-    try {
-      getSystem(props);
-    } catch (IllegalArgumentException e) {
-      org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
-          .info("caught expected exception (2_", e);
-    }
-    // use a valid bind address
-    props.setProperty(BIND_ADDRESS, InetAddress.getLocalHost().getCanonicalHostName());
-    getSystem().disconnect();
-  }
-
-  /**
-   * install a new view and show that waitForViewInstallation works as expected
-   */
-  @Test
-  public void testWaitForViewInstallation() {
-    getSystem(new Properties());
-
-    MembershipManager mgr = basicGetSystem().getDM().getMembershipManager();
-
-    final NetView v = mgr.getView();
-
-    final boolean[] passed = new boolean[1];
-    Thread t = new Thread("wait for view installation") {
-      public void run() {
-        try {
-          ((ClusterDistributionManager) basicGetSystem().getDM())
-              .waitForViewInstallation(v.getViewId() + 1);
-          synchronized (passed) {
-            passed[0] = true;
-          }
-        } catch (InterruptedException e) {
-          // failed
-        }
-      }
-    };
-    t.setDaemon(true);
-    t.start();
-
-    Wait.pause(2000);
-
-    NetView newView = new NetView(v, v.getViewId() + 1);
-    ((Manager) mgr).installView(newView);
-
-    Wait.pause(2000);
-
-    synchronized (passed) {
-      Assert.assertTrue(passed[0]);
-    }
-  }
-}

-- 
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <commits@geode.apache.org>.

Mime
View raw message