From commits-return-25179-archive-asf-public=cust-asf.ponee.io@geode.apache.org Fri Jan 5 22:03:02 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 0BF2B180647 for ; Fri, 5 Jan 2018 22:03:02 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id EF73B160C15; Fri, 5 Jan 2018 21:03:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1BFBD160C3B for ; Fri, 5 Jan 2018 22:02:59 +0100 (CET) Received: (qmail 51976 invoked by uid 500); 5 Jan 2018 21:02:59 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 51962 invoked by uid 99); 5 Jan 2018 21:02:59 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Jan 2018 21:02:59 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 283FC88202; Fri, 5 Jan 2018 21:02:57 +0000 (UTC) Date: Fri, 05 Jan 2018 21:02:57 +0000 To: "commits@geode.apache.org" Subject: [geode] 03/03: GEODE-3965: rename and cleanup DistributionManager tests MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: klund@apache.org In-Reply-To: <151518617444.29562.1945460220233712524@gitbox.apache.org> References: <151518617444.29562.1945460220233712524@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: geode X-Git-Refname: refs/heads/develop X-Git-Reftype: branch X-Git-Rev: 1f4c90794af85b4db6adeee86555f486ecf3f6e6 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20180105210257.283FC88202@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. klund pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git commit 1f4c90794af85b4db6adeee86555f486ecf3f6e6 Author: Kirk Lund AuthorDate: Thu Jan 4 15:40:57 2018 -0800 GEODE-3965: rename and cleanup DistributionManager tests * DistributionManagerTest -> ClusterDistributionManagerTest * DistributionManagerDUnitTest -> ClusterDistributionManagerDUnitTest * ConsoleDistributionManagerDUnitTest -> ClusterDistributionManagerForAdminDUnitTest --- .../ClusterDistributionManagerDUnitTest.java | 399 +++++++++++++++ ...lusterDistributionManagerForAdminDUnitTest.java | 317 ++++++++++++ ...st.java => ClusterDistributionManagerTest.java} | 4 +- .../ConsoleDistributionManagerDUnitTest.java | 446 ----------------- .../internal/DistributionManagerDUnitTest.java | 545 --------------------- 5 files changed, 719 insertions(+), 992 deletions(-) diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/ClusterDistributionManagerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/ClusterDistributionManagerDUnitTest.java new file mode 100644 index 0000000..0ba2381 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/ClusterDistributionManagerDUnitTest.java @@ -0,0 +1,399 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.distributed.internal; + +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.geode.distributed.ConfigurationProperties.ACK_SEVERE_ALERT_THRESHOLD; +import static org.apache.geode.distributed.ConfigurationProperties.ACK_WAIT_THRESHOLD; +import static org.apache.geode.distributed.ConfigurationProperties.BIND_ADDRESS; +import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; +import static org.apache.geode.distributed.ConfigurationProperties.NAME; +import static org.apache.geode.distributed.internal.DistributionConfig.GEMFIRE_PREFIX; +import static org.apache.geode.test.dunit.Assert.assertEquals; +import static org.apache.geode.test.dunit.Assert.assertTrue; +import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException; +import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM; +import static org.apache.geode.test.dunit.NetworkUtils.getIPLiteral; +import static org.apache.geode.test.dunit.Wait.pause; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.awaitility.Awaitility.await; + +import java.net.InetAddress; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.logging.log4j.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.ForcedDisconnectException; +import org.apache.geode.LogWriter; +import org.apache.geode.admin.AdminDistributedSystem; +import org.apache.geode.admin.AdminDistributedSystemFactory; +import org.apache.geode.admin.AlertLevel; +import org.apache.geode.admin.DistributedSystemConfig; +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.CacheListener; +import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.EntryEvent; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionEvent; +import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.Scope; +import org.apache.geode.cache.util.CacheListenerAdapter; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.distributed.internal.membership.MembershipManager; +import org.apache.geode.distributed.internal.membership.NetView; +import org.apache.geode.distributed.internal.membership.gms.MembershipManagerHelper; +import org.apache.geode.distributed.internal.membership.gms.mgr.GMSMembershipManager; +import org.apache.geode.internal.logging.LogService; +import org.apache.geode.test.dunit.DistributedTestCase; +import org.apache.geode.test.dunit.Host; +import org.apache.geode.test.dunit.SerializableRunnable; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties; +import org.apache.geode.test.dunit.rules.SharedErrorCollector; +import org.apache.geode.test.junit.categories.DistributedTest; +import org.apache.geode.test.junit.categories.MembershipTest; + +/** + * This class tests the functionality of the {@link ClusterDistributionManager} class. + */ +@Category({DistributedTest.class, MembershipTest.class}) +public class ClusterDistributionManagerDUnitTest extends DistributedTestCase { + private static final Logger logger = LogService.getLogger(); + + private static volatile boolean regionDestroyedInvoked; + private static volatile Cache myCache; + private static volatile boolean alertReceived; + + private transient ExecutorService executorService; + + private VM vm1; + + @Rule + public DistributedRestoreSystemProperties restoreSystemProperties = + new DistributedRestoreSystemProperties(); + + @Rule + public SharedErrorCollector errorCollector = new SharedErrorCollector(); + + @Before + public void setUp() throws Exception { + executorService = Executors.newSingleThreadExecutor(); + vm1 = Host.getHost(0).getVM(1); + } + + @After + public void tearDown() throws Exception { + disconnectAllFromDS(); + invokeInEveryVM(() -> { + regionDestroyedInvoked = false; + myCache = null; + alertReceived = false; + }); + assertThat(executorService.shutdownNow()).isEmpty(); + } + + @Test + public void testGetDistributionVMType() { + DistributionManager dm = getSystem().getDistributionManager(); + InternalDistributedMember member = dm.getId(); + + assertEquals(ClusterDistributionManager.NORMAL_DM_TYPE, member.getVmKind()); + } + + /** + * Demonstrate that a new UDP port is used when an attempt is made to reconnect using a shunned + * port + */ + @Test + public void testConnectAfterBeingShunned() { + InternalDistributedSystem system = getSystem(); + MembershipManager membershipManager = MembershipManagerHelper.getMembershipManager(system); + InternalDistributedMember memberBefore = membershipManager.getLocalMember(); + + // TODO GMS needs to have a system property allowing the bind-port to be set + System.setProperty(GEMFIRE_PREFIX + "jg-bind-port", "" + memberBefore.getPort()); + system.disconnect(); + system = getSystem(); + membershipManager = MembershipManagerHelper.getMembershipManager(system); + system.disconnect(); + InternalDistributedMember memberAfter = membershipManager.getLocalMember(); + + assertThat(memberAfter.getPort()).isEqualTo(memberBefore.getPort()); + } + + /** + * Test the handling of "surprise members" in the membership manager. Create a DistributedSystem + * in this VM and then add a fake member to its surpriseMember set. Then ensure that it stays in + * the set when a new membership view arrives that doesn't contain it. Then wait until the member + * should be gone and force more view processing to have it scrubbed from the set. + **/ + @Test + public void testSurpriseMemberHandling() throws Exception { + System.setProperty(GEMFIRE_PREFIX + "surprise-member-timeout", "3000"); + InternalDistributedSystem system = getSystem(); + GMSMembershipManager membershipManager = + (GMSMembershipManager) MembershipManagerHelper.getMembershipManager(system); + assertThat(membershipManager.isCleanupTimerStarted()).isTrue(); + + InternalDistributedMember member = new InternalDistributedMember(getIPLiteral(), 12345); + + // first make sure we can't add this as a surprise member (bug #44566) + // if the view number isn't being recorded correctly the test will pass but the + // functionality is broken + assertThat(membershipManager.getView().getViewId()).isGreaterThan(0); + + int oldViewId = member.getVmViewId(); + member.setVmViewId(membershipManager.getView().getViewId() - 1); + + addIgnoredException("attempt to add old member"); + addIgnoredException("Removing shunned GemFire node"); + + boolean accepted = membershipManager.addSurpriseMember(member); + assertThat(accepted).as("member with old ID was not rejected (bug #44566)").isFalse(); + + member.setVmViewId(oldViewId); + + // now forcibly add it as a surprise member and show that it is reaped + long gracePeriod = 5000; + long startTime = System.currentTimeMillis(); + long timeout = membershipManager.getSurpriseMemberTimeout(); + long birthTime = startTime - timeout + gracePeriod; + MembershipManagerHelper.addSurpriseMember(system, member, birthTime); + assertThat(membershipManager.isSurpriseMember(member)).as("Member was not a surprise member") + .isTrue(); + + await("waiting for member to be removed") + .atMost((timeout / 3) + gracePeriod, TimeUnit.MILLISECONDS) + .until(() -> !membershipManager.isSurpriseMember(member)); + } + + /** + * Tests that a severe-level alert is generated if a member does not respond with an ack quickly + * enough. vm0 and vm1 create a region and set ack-severe-alert-threshold. vm1 has a cache + * listener in its region that sleeps when notified, forcing the operation to take longer than + * ack-wait-threshold + ack-severe-alert-threshold + */ + @Test + public void testAckSevereAlertThreshold() throws Exception { + // in order to set a small ack-wait-threshold, we have to remove the + // system property established by the dunit harness + System.clearProperty(GEMFIRE_PREFIX + ACK_WAIT_THRESHOLD); + + Properties config = getDistributedSystemProperties(); + config.setProperty(MCAST_PORT, "0"); + config.setProperty(ACK_WAIT_THRESHOLD, "3"); + config.setProperty(ACK_SEVERE_ALERT_THRESHOLD, "3"); + config.setProperty(NAME, "putter"); + + getSystem(config); + Region region = + new RegionFactory().setScope(Scope.DISTRIBUTED_ACK).setEarlyAck(false) + .setDataPolicy(DataPolicy.REPLICATE).create("testRegion"); + + vm1.invoke("Connect to distributed system", () -> { + config.setProperty(NAME, "sleeper"); + getSystem(config); + addIgnoredException("elapsed while waiting for replies"); + + RegionFactory regionFactory = new RegionFactory<>(); + Region sameRegion = + regionFactory.setScope(Scope.DISTRIBUTED_ACK).setDataPolicy(DataPolicy.REPLICATE) + .setEarlyAck(false).addCacheListener(getSleepingListener(false)).create("testRegion"); + myCache = sameRegion.getCache(); + + createAlertListener(); + }); + + // now we have two caches set up. vm1 has a listener that will sleep + // and cause the severe-alert threshold to be crossed + + region.put("bomb", "pow!"); // this will hang until vm1 responds + disconnectAllFromDS(); + + vm1.invoke(() -> { + assertThat(alertReceived).isTrue(); + }); + } + + /** + * Tests that a sick member is kicked out + */ + @Test + public void testKickOutSickMember() throws Exception { + addIgnoredException("10 seconds have elapsed while waiting"); + + // in order to set a small ack-wait-threshold, we have to remove the + // system property established by the dunit harness + System.clearProperty(GEMFIRE_PREFIX + ACK_WAIT_THRESHOLD); + + Properties config = getDistributedSystemProperties(); + config.setProperty(MCAST_PORT, "0"); // loner + config.setProperty(ACK_WAIT_THRESHOLD, "5"); + config.setProperty(ACK_SEVERE_ALERT_THRESHOLD, "5"); + config.setProperty(NAME, "putter"); + + getSystem(config); + Region region = new RegionFactory() + .setScope(Scope.DISTRIBUTED_ACK).setDataPolicy(DataPolicy.REPLICATE).create("testRegion"); + + addIgnoredException("sec have elapsed while waiting for replies"); + + vm1.invoke(new SerializableRunnable("Connect to distributed system") { + @Override + public void run() { + config.setProperty(NAME, "sleeper"); + getSystem(config); + + addIgnoredException("service failure"); + addIgnoredException(ForcedDisconnectException.class.getName()); + + RegionFactory regionFactory = new RegionFactory<>(); + Region sameRegion = + regionFactory.setScope(Scope.DISTRIBUTED_ACK).setDataPolicy(DataPolicy.REPLICATE) + .addCacheListener(getSleepingListener(true)).create("testRegion"); + myCache = sameRegion.getCache(); + } + }); + + // now we have two caches set up, each having an alert listener. Vm1 + // also has a cache listener that will turn off its ability to respond + // to "are you dead" messages and then sleep + + region.put("bomb", "pow!"); + disconnectFromDS(); + + vm1.invoke("wait for forced disconnect", () -> { + await("vm1's system should have been disconnected").atMost(1, MINUTES) + .until(() -> assertThat(basicGetSystem().isConnected()).isFalse()); + + await("vm1's cache is not closed").atMost(30, SECONDS) + .until(() -> assertThat(myCache.isClosed()).isTrue()); + + await("vm1's listener should have received afterRegionDestroyed notification") + .atMost(30, SECONDS).until(() -> assertThat(regionDestroyedInvoked).isTrue()); + }); + } + + /** + * test use of a bad bind-address for bug #32565 + */ + @Test + public void testBadBindAddress() throws Exception { + Properties config = getDistributedSystemProperties(); + config.setProperty(MCAST_PORT, "0"); // loner + config.setProperty(ACK_WAIT_THRESHOLD, "5"); + config.setProperty(ACK_SEVERE_ALERT_THRESHOLD, "5"); + + // use a valid address that's not proper for this machine + config.setProperty(BIND_ADDRESS, "www.yahoo.com"); + assertThatThrownBy(() -> getSystem(config)).isInstanceOf(IllegalArgumentException.class); + + // use an invalid address + config.setProperty(BIND_ADDRESS, "bruce.schuchardt"); + assertThatThrownBy(() -> getSystem(config)).isInstanceOf(IllegalArgumentException.class); + + // use a valid bind address + config.setProperty(BIND_ADDRESS, InetAddress.getLocalHost().getCanonicalHostName()); + assertThatCode(() -> getSystem()).doesNotThrowAnyException(); + } + + /** + * install a new view and show that waitForViewInstallation works as expected + */ + @Test + public void testWaitForViewInstallation() { + InternalDistributedSystem system = getSystem(new Properties()); + ClusterDistributionManager dm = (ClusterDistributionManager) system.getDM(); + GMSMembershipManager membershipManager = (GMSMembershipManager) dm.getMembershipManager(); + NetView view = membershipManager.getView(); + + AtomicBoolean waitForViewInstallationDone = new AtomicBoolean(); + executorService.submit(() -> { + try { + dm.waitForViewInstallation(view.getViewId() + 1); + waitForViewInstallationDone.set(true); + } catch (InterruptedException e) { + errorCollector.addError(e); + } + }); + + pause(2000); + + NetView newView = new NetView(view, view.getViewId() + 1); + membershipManager.installView(newView); + + await().atMost(30, SECONDS).until(() -> assertThat(waitForViewInstallationDone.get()).isTrue()); + } + + private CacheListener getSleepingListener(final boolean playDead) { + regionDestroyedInvoked = false; + + return new CacheListenerAdapter() { + @Override + public void afterCreate(EntryEvent event) { + try { + if (playDead) { + MembershipManagerHelper.beSickMember(getSystemStatic()); + MembershipManagerHelper.playDead(getSystemStatic()); + } + Thread.sleep(15 * 1000); + } catch (InterruptedException ie) { + errorCollector.addError(ie); + } + } + + @Override + public void afterRegionDestroy(RegionEvent event) { + LogWriter logWriter = myCache.getLogger(); + logWriter.info("afterRegionDestroyed invoked in sleeping listener"); + logWriter.info("service failure"); + logWriter.info( + "org.apache.geode.ForcedDisconnectException"); + regionDestroyedInvoked = true; + } + }; + } + + private void createAlertListener() throws Exception { + DistributedSystemConfig config = + AdminDistributedSystemFactory.defineDistributedSystem(getSystemStatic(), null); + AdminDistributedSystem adminSystem = AdminDistributedSystemFactory.getDistributedSystem(config); + adminSystem.setAlertLevel(AlertLevel.SEVERE); + adminSystem.addAlertListener(alert -> { + try { + logger.info("alert listener invoked for alert originating in " + alert.getConnectionName()); + logger.info(" alert text = " + alert.getMessage()); + logger.info(" systemMember = " + alert.getSystemMember()); + } catch (Exception e) { + errorCollector.addError(e); + } + alertReceived = true; + }); + adminSystem.connect(); + assertTrue(adminSystem.waitToBeConnected(5 * 1000)); + } +} diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/ClusterDistributionManagerForAdminDUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/ClusterDistributionManagerForAdminDUnitTest.java new file mode 100644 index 0000000..37191ff --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/ClusterDistributionManagerForAdminDUnitTest.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.distributed.internal; + +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.geode.distributed.ConfigurationProperties.STATISTIC_SAMPLING_ENABLED; +import static org.apache.geode.test.dunit.LogWriterUtils.getLogWriter; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.net.InetAddress; +import java.util.Set; + +import org.apache.logging.log4j.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.AttributesFactory; +import org.apache.geode.cache.CacheException; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionAttributes; +import org.apache.geode.cache.Scope; +import org.apache.geode.distributed.DistributedLockService; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.Config; +import org.apache.geode.internal.admin.Alert; +import org.apache.geode.internal.admin.AlertListener; +import org.apache.geode.internal.admin.ApplicationVM; +import org.apache.geode.internal.admin.DLockInfo; +import org.apache.geode.internal.admin.EntryValueNode; +import org.apache.geode.internal.admin.GemFireVM; +import org.apache.geode.internal.admin.GfManagerAgent; +import org.apache.geode.internal.admin.GfManagerAgentConfig; +import org.apache.geode.internal.admin.GfManagerAgentFactory; +import org.apache.geode.internal.admin.StatResource; +import org.apache.geode.internal.admin.remote.RemoteTransportConfig; +import org.apache.geode.internal.logging.LogService; +import org.apache.geode.test.dunit.Host; +import org.apache.geode.test.dunit.IgnoredException; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.cache.CacheTestCase; +import org.apache.geode.test.junit.categories.DistributedTest; +import org.apache.geode.test.junit.categories.FlakyTest; + +/** + * This class tests the functionality of the {@linkplain org.apache.geode.internal.admin internal + * admin} API. + */ +@Category(DistributedTest.class) +public class ClusterDistributionManagerForAdminDUnitTest extends CacheTestCase + implements AlertListener { + + private static Logger logger = LogService.getLogger(); + + private transient GfManagerAgent agent; + + @Before + public void setUp() throws Exception { + IgnoredException.addIgnoredException("Error occurred while reading system log"); + + ClusterDistributionManager.setIsDedicatedAdminVM(true); + + populateCache(); + + boolean created = !isConnectedToDS(); + InternalDistributedSystem ds = getSystem(); + RemoteTransportConfig transport = + new RemoteTransportConfig(ds.getConfig(), ClusterDistributionManager.ADMIN_ONLY_DM_TYPE); + if (created) { + disconnectFromDS(); + } + + // create a GfManagerAgent in the master vm. + this.agent = GfManagerAgentFactory.getManagerAgent( + new GfManagerAgentConfig(null, transport, getLogWriter(), Alert.SEVERE, this, null)); + + await().atMost(1, MINUTES).until(() -> assertThat(agent.isConnected()).isTrue()); + } + + @After + public void preTearDownCacheTestCase() throws Exception { + try { + if (this.agent != null) { + this.agent.disconnect(); + } + disconnectFromDS(); + } finally { + ClusterDistributionManager.setIsDedicatedAdminVM(false); + } + } + + @Test + public void testGetDistributionVMType() { + DistributionManager dm = this.agent.getDM(); + assertThat(dm.getId().getVmKind()).isEqualTo(ClusterDistributionManager.ADMIN_ONLY_DM_TYPE); + } + + @Test + public void testAgent() { + assertThat(agent.listPeers()).hasSize(0); + assertThat(agent.isConnected()).isTrue(); + agent.disconnect(); + assertThat(agent.isConnected()).isFalse(); + } + + @Category(FlakyTest.class) // GEODE-1688 + @Test + public void testApplications() throws Exception { + await().atMost(1, MINUTES) + .until(() -> assertThat(agent.listApplications().length).isGreaterThanOrEqualTo(4)); + + ApplicationVM[] applications = agent.listApplications(); + for (int whichApplication = 0; whichApplication < applications.length; whichApplication++) { + + InetAddress host = applications[whichApplication].getHost(); + String appHostName = host.getHostName(); + + assertThat(host).isEqualTo(InetAddress.getByName(appHostName)); + + StatResource[] stats = applications[whichApplication].getStats(null); + assertThat(stats.length).isGreaterThan(0); + + Config config = applications[whichApplication].getConfig(); + String[] attributeNames = config.getAttributeNames(); + boolean foundStatisticSamplingEnabled = false; + for (String attributeName : attributeNames) { + if (attributeName.equals(STATISTIC_SAMPLING_ENABLED)) { + foundStatisticSamplingEnabled = true; + assertThat(config.getAttribute(attributeName)).isEqualTo("true"); + break; + } + } + assertThat(foundStatisticSamplingEnabled).isTrue(); + + String[] logs = applications[whichApplication].getSystemLogs(); + assertThat(logs.length).isGreaterThan(0); + + VM vm = findVMForAdminObject(applications[whichApplication]); + assertThat(vm).isNotNull(); + + String lockName = "cdm_testlock" + whichApplication; + assertThat(acquireDistributedLock(vm, lockName)).isTrue(); + + DLockInfo[] locks = applications[whichApplication].getDistributedLockInfo(); + assertThat(locks.length).isGreaterThan(0); + + boolean foundLock = false; + for (DLockInfo lock : locks) { + if (lock.getLockName().equals(lockName)) { + foundLock = true; + assertThat(lock.isAcquired()).isTrue(); + } + } + assertThat(foundLock).isTrue(); + + Region[] roots = applications[whichApplication].getRootRegions(); + assertThat(roots.length).isGreaterThan(0); + + Region root = roots[0]; + assertThat(root).isNotNull(); + assertThat(root.getName()).isEqualTo("root"); + assertThat(root.getFullPath()).isEqualTo("/root"); + + RegionAttributes attributes = root.getAttributes(); + assertThat(attributes).isNotNull(); + if (attributes.getStatisticsEnabled()) { + assertThat(root.getStatistics()).isNotNull(); + } + + Set subregions = root.subregions(false); + assertThat(subregions).hasSize(3); + assertThat(root.keySet()).hasSize(2); + + Region.Entry entry = root.getEntry("cacheObj1"); + assertThat(entry).isNotNull(); + if (attributes.getStatisticsEnabled()) { + assertThat(entry.getStatistics()).isNotNull(); + } + assertThat(entry.getValue()).isEqualTo("null"); + + /// test lightweight inspection; + entry = root.getEntry("cacheObj2"); + assertThat(entry).isNotNull(); + + Object val = entry.getValue(); + assertThat(val).isInstanceOf(String.class); + assertThat(((String) val)).contains("java.lang.StringBuffer"); + + /// test physical inspection + applications[whichApplication].setCacheInspectionMode(GemFireVM.PHYSICAL_CACHE_VALUE); + entry = root.getEntry("cacheObj2"); + assertThat(entry).isNotNull(); + + val = entry.getValue(); + assertThat(val).isInstanceOf(EntryValueNode.class); + + EntryValueNode node = (EntryValueNode) val; + String type = node.getType(); + assertThat(type).contains("java.lang.StringBuffer"); + assertThat(node.isPrimitiveOrString()).isFalse(); + + EntryValueNode[] fields = node.getChildren(); + assertThat(fields).isNotNull(); + + getLogWriter().warning( + "The tests use StringBuffers for values which might be implemented differently in jdk 1.5"); + + /// test destruction in the last valid app + int lastApplication = applications.length - 1; + if (whichApplication == lastApplication) { + int expectedSize = subregions.size() - 1; + Region subRegion = (Region) subregions.iterator().next(); + Region rootRegion = root; + subRegion.destroyRegion(); + + await().atMost(30, SECONDS) + .until(() -> assertThat(rootRegion.subregions(false)).hasSize(expectedSize)); + } + } + } + + @Override + public void alert(Alert alert) { + getLogWriter().info("DEBUG: alert=" + alert); + } + + private void populateCache() { + AttributesFactory attributesFactory = new AttributesFactory(); + attributesFactory.setScope(Scope.DISTRIBUTED_NO_ACK); + + RegionAttributes regionAttributes = attributesFactory.create(); + + for (int i = 0; i < Host.getHostCount(); i++) { + Host host = Host.getHost(i); + + for (int j = 0; j < host.getVMCount(); j++) { + VM vm = host.getVM(j); + vm.invoke(() -> { + createRegion("cdm-testSubRegion1", regionAttributes); + createRegion("cdm-testSubRegion2", regionAttributes); + createRegion("cdm-testSubRegion3", regionAttributes); + remoteCreateEntry("", "cacheObj1", null); + StringBuffer val = new StringBuffer("userDefValue1"); + remoteCreateEntry("", "cacheObj2", val); + }); + } + } + } + + /** + * Puts (or creates) a value in a region named regionName named + * entryName. + */ + private void remoteCreateEntry(String regionName, String entryName, Object value) + throws CacheException { + + Region root = getRootRegion(); + Region region = root.getSubregion(regionName); + region.create(entryName, value); + + logger.info("Put value " + value + " in entry " + entryName + " in region '" + + region.getFullPath() + "'"); + } + + private boolean acquireDistributedLock(VM vm, String lockName) { + return vm.invoke(() -> remoteAcquireDistLock(lockName)); + } + + private boolean remoteAcquireDistLock(String lockName) { + String serviceName = "cdmtest_service"; + DistributedLockService service = DistributedLockService.getServiceNamed(serviceName); + if (service == null) { + service = + DistributedLockService.create(serviceName, InternalDistributedSystem.getAnyInstance()); + } + assertThat(service).isNotNull(); + return service.lock(lockName, 1000, 3000); + } + + private VM findVMForAdminObject(GemFireVM gemFireVM) { + for (int i = 0; i < Host.getHostCount(); i++) { + Host host = Host.getHost(i); + for (int j = 0; j < host.getVMCount(); j++) { + VM vm = host.getVM(j); + InternalDistributedMember member = getJavaGroupsIdForVM(vm); + if (gemFireVM.getId().equals(member)) { + return vm; + } + } + } + return null; + } + + private InternalDistributedMember getJavaGroupsIdForVM(VM vm) { + return vm.invoke(() -> remoteGetJavaGroupsIdForVM()); + } + + private InternalDistributedMember remoteGetJavaGroupsIdForVM() { + InternalDistributedSystem system = InternalDistributedSystem.getAnyInstance(); + return system.getDistributionManager().getDistributionManagerId(); + } +} diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionManagerTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/ClusterDistributionManagerTest.java similarity index 97% rename from geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionManagerTest.java rename to geode-core/src/test/java/org/apache/geode/distributed/internal/ClusterDistributionManagerTest.java index 6050d68..a941cfc 100644 --- a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionManagerTest.java +++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/ClusterDistributionManagerTest.java @@ -29,15 +29,17 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe import org.apache.geode.test.junit.categories.UnitTest; @Category(UnitTest.class) -public class DistributionManagerTest { +public class ClusterDistributionManagerTest { @Test public void shouldBeMockable() throws Exception { ClusterDistributionManager mockDistributionManager = mock(ClusterDistributionManager.class); InternalDistributedMember mockInternalDistributedMember = mock(InternalDistributedMember.class); Executor mockExecutor = mock(Executor.class); + when(mockDistributionManager.getExecutor(anyInt(), eq(mockInternalDistributedMember))) .thenReturn(mockExecutor); + assertThat(mockDistributionManager.getExecutor(1, mockInternalDistributedMember)) .isSameAs(mockExecutor); } diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/ConsoleDistributionManagerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/ConsoleDistributionManagerDUnitTest.java deleted file mode 100644 index 8e100a7..0000000 --- a/geode-core/src/test/java/org/apache/geode/distributed/internal/ConsoleDistributionManagerDUnitTest.java +++ /dev/null @@ -1,446 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.distributed.internal; - -import static org.apache.geode.distributed.ConfigurationProperties.*; -import static org.junit.Assert.*; - -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.Set; - -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import org.apache.geode.SystemFailure; -import org.apache.geode.cache.AttributesFactory; -import org.apache.geode.cache.CacheException; -import org.apache.geode.cache.Region; -import org.apache.geode.cache.RegionAttributes; -import org.apache.geode.cache.Scope; -import org.apache.geode.distributed.DistributedLockService; -import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -import org.apache.geode.internal.Config; -import org.apache.geode.internal.admin.Alert; -import org.apache.geode.internal.admin.AlertListener; -import org.apache.geode.internal.admin.ApplicationVM; -import org.apache.geode.internal.admin.DLockInfo; -import org.apache.geode.internal.admin.EntryValueNode; -import org.apache.geode.internal.admin.GemFireVM; -import org.apache.geode.internal.admin.GfManagerAgent; -import org.apache.geode.internal.admin.GfManagerAgentConfig; -import org.apache.geode.internal.admin.GfManagerAgentFactory; -import org.apache.geode.internal.admin.StatResource; -import org.apache.geode.internal.admin.remote.RemoteTransportConfig; -import org.apache.geode.test.dunit.Host; -import org.apache.geode.test.dunit.IgnoredException; -import org.apache.geode.test.dunit.LogWriterUtils; -import org.apache.geode.test.dunit.SerializableRunnable; -import org.apache.geode.test.dunit.VM; -import org.apache.geode.test.dunit.Wait; -import org.apache.geode.test.dunit.WaitCriterion; -import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; -import org.apache.geode.test.junit.categories.DistributedTest; -import org.apache.geode.test.junit.categories.FlakyTest; - -/** - * This class tests the functionality of the {@linkplain org.apache.geode.internal.admin internal - * admin} API. - */ -@Category(DistributedTest.class) -public class ConsoleDistributionManagerDUnitTest extends JUnit4CacheTestCase - implements AlertListener { - - protected GfManagerAgent agent = null; - private static boolean firstTime = true; - - public void alert(Alert alert) { - LogWriterUtils.getLogWriter().info("DEBUG: alert=" + alert); - } - - @Override - public final void postSetUp() throws Exception { - boolean finishedSetup = false; - IgnoredException.addIgnoredException("Error occurred while reading system log"); - try { - if (firstTime) { - disconnectFromDS(); // make sure there's no ldm lying around - try { - Thread.sleep(5 * 1000); - } catch (InterruptedException ie) { - fail("interrupted"); - } - firstTime = false; - } - - ClusterDistributionManager.setIsDedicatedAdminVM(true); - - populateCache(); - - RemoteTransportConfig transport = null; - { - boolean created = !isConnectedToDS(); - InternalDistributedSystem ds = getSystem(); - transport = new RemoteTransportConfig(ds.getConfig(), - ClusterDistributionManager.ADMIN_ONLY_DM_TYPE); - if (created) { - disconnectFromDS(); - } - } - // create a GfManagerAgent in the master vm. - this.agent = GfManagerAgentFactory.getManagerAgent(new GfManagerAgentConfig(null, transport, - LogWriterUtils.getLogWriter(), Alert.SEVERE, this, null)); - if (!agent.isConnected()) { - WaitCriterion ev = new WaitCriterion() { - public boolean done() { - return agent.isConnected(); - } - - public String description() { - return null; - } - }; - Wait.waitForCriterion(ev, 60 * 1000, 200, true); - } - finishedSetup = true; - } finally { - if (!finishedSetup) { - try { - this.agent.disconnect(); - } catch (VirtualMachineError e) { - SystemFailure.initiateFailure(e); - throw e; - } catch (Throwable ignore) { - } - try { - super.preTearDown(); - } catch (VirtualMachineError e) { - SystemFailure.initiateFailure(e); - throw e; - } catch (Throwable ignore) { - } - try { - disconnectFromDS(); - } catch (VirtualMachineError e) { - SystemFailure.initiateFailure(e); - throw e; - } catch (Throwable ignore) { - } - ClusterDistributionManager.setIsDedicatedAdminVM(false); - } - } - } - - @Override - public final void preTearDownCacheTestCase() throws Exception { - this.agent.disconnect(); - } - - @Override - public final void postTearDownCacheTestCase() throws Exception { - try { - disconnectFromDS(); // make sure there's no ldm lying around - } finally { - ClusterDistributionManager.setIsDedicatedAdminVM(false); - } - } - - @Test - public void testGetDistributionVMType() { - DistributionManager dm = this.agent.getDM(); - InternalDistributedMember ipaddr = dm.getId(); - assertEquals(ClusterDistributionManager.ADMIN_ONLY_DM_TYPE, ipaddr.getVmKind()); - } - - @Test - public void testAgent() { - assertEquals("expected empty peer array", 0, agent.listPeers().length); - int systemCount = 0; - for (int h = 0; h < Host.getHostCount(); h++) { - Host host = Host.getHost(h); - systemCount += host.getSystemCount(); - } - // note that JoinLeaveListener is not tested since it would require - // this test to start and stop systems. - agent.disconnect(); - assertTrue("agent should have been disconnected", !agent.isConnected()); - } - - @Category(FlakyTest.class) // GEODE-1688 - @Test - public void testApplications() throws Exception { - { - WaitCriterion ev = new WaitCriterion() { - public boolean done() { - ApplicationVM[] apps = agent.listApplications(); - return apps.length >= 4; - } - - public String description() { - return null; - } - }; - Wait.waitForCriterion(ev, 60 * 1000, 200, true); - } - - // final Serializable controllerId = getSystem().getDistributionManager().getId(); //can't do - // this... - ApplicationVM[] apps = agent.listApplications(); - for (int i = 0; i < apps.length; i++) { - // if (apps[i].getId().equals(controllerId)) { - // continue; // skip this one; its the locator vm - // } - InetAddress host = apps[i].getHost(); - String appHostName = host.getHostName(); - try { - InetAddress appHost = InetAddress.getByName(appHostName); - assertEquals(appHost, host); - } catch (UnknownHostException ex) { - fail("Lookup of address for host " + appHostName + " failed because " + ex); - } - - StatResource[] stats = apps[i].getStats(null); - assertTrue(stats.length > 0); - - Config conf = apps[i].getConfig(); - String[] attNames = conf.getAttributeNames(); - boolean foundIt = false; - for (int j = 0; j < attNames.length; j++) { - if (attNames[j].equals(STATISTIC_SAMPLING_ENABLED)) { - foundIt = true; - assertEquals(conf.getAttribute(attNames[j]), "true"); - break; - } - } - assertTrue(foundIt); - - String[] logs = apps[i].getSystemLogs(); - assertTrue(logs.length > 0); - // assertTrue(logs[0].length() > 0); - - { - VM vm = findVMForAdminObject(apps[i]); - // getLogWriter().info("DEBUG: found VM " + vm + - // " which corresponds to ApplicationVM " - // + apps[i] + " in testApplications"); - assertNotNull(vm); - String lockName = "cdm_testlock" + i; - assertTrue(acquireDistLock(vm, lockName)); - DLockInfo[] locks = apps[i].getDistributedLockInfo(); - assertTrue(locks.length > 0); - boolean foundLock = false; - for (int j = 0; j < locks.length; j++) { - if (locks[j].getLockName().equals(lockName)) { - foundLock = true; - assertTrue(locks[j].isAcquired()); - } - } - assertTrue(foundLock); - } - - Region[] roots = apps[i].getRootRegions(); - if (roots.length == 0) { - LogWriterUtils.getLogWriter().info( - "DEBUG: testApplications: apps[" + i + "]=" + apps[i] + " did not have a root region"); - } else { - Region root = roots[0]; - assertNotNull(root); - assertEquals("root", root.getName()); - assertEquals("/root", root.getFullPath()); - RegionAttributes attributes = root.getAttributes(); - assertNotNull(attributes); - if (attributes.getStatisticsEnabled()) { - assertNotNull(root.getStatistics()); - } - final Set subregions = root.subregions(false); - assertEquals(3, subregions.size()); - assertEquals(2, root.keySet().size()); - Region.Entry entry = root.getEntry("cacheObj1"); - assertNotNull(entry); - if (attributes.getStatisticsEnabled()) { - assertNotNull(entry.getStatistics()); - } - assertTrue(entry.getValue().equals("null")); - - /// test lightweight inspection; - entry = root.getEntry("cacheObj2"); - assertNotNull(entry); - Object val = entry.getValue(); - assertTrue(val instanceof String); - assertTrue(((String) val).indexOf("java.lang.StringBuffer") != -1); - - /// test physical inspection - // getLogWriter().info("DEBUG: Starting test of physical cache value inspection"); - apps[i].setCacheInspectionMode(GemFireVM.PHYSICAL_CACHE_VALUE); - entry = root.getEntry("cacheObj2"); - assertNotNull(entry); - val = entry.getValue(); - assertTrue(val instanceof EntryValueNode); - EntryValueNode node = (EntryValueNode) val; - String type = node.getType(); - assertTrue(type.indexOf("java.lang.StringBuffer") != -1); - assertTrue(!node.isPrimitiveOrString()); - EntryValueNode[] fields = node.getChildren(); - assertNotNull(fields); - LogWriterUtils.getLogWriter().warning( - "The tests use StringBuffers for values which might be implmented differently in jdk 1.5"); - // assertTrue(fields.length > 0); - - /// test destruction in the last valid app - int lastIdx = apps.length - 1; - - /* - * if (i == lastIdx || (i == lastIdx-1 && apps[lastIdx].getId().equals(controllerId))) { - */ - - if (i == lastIdx) { - // getLogWriter().info("DEBUG: starting region destroy from admin apis"); - final int expectedSize = subregions.size() - 1; - final Region r = (Region) subregions.iterator().next(); - final Region rr = root; - r.destroyRegion(); - WaitCriterion ev = new WaitCriterion() { - public boolean done() { - Set s = rr.subregions(false); - return s.size() == expectedSize; - } - - public String description() { - return "Waited 20 seconds for region " + r.getFullPath() + "to be destroyed."; - } - }; - Wait.waitForCriterion(ev, 20 * 1000, 200, true); - } - } - } - } - - private void populateCache() { - - AttributesFactory fact = new AttributesFactory(); - fact.setScope(Scope.DISTRIBUTED_NO_ACK); - final RegionAttributes rAttr = fact.create(); - - final SerializableRunnable populator = new SerializableRunnable() { - public void run() { - try { - createRegion("cdm-testSubRegion1", rAttr); - createRegion("cdm-testSubRegion2", rAttr); - createRegion("cdm-testSubRegion3", rAttr); - remoteCreateEntry("", "cacheObj1", null); - StringBuffer val = new StringBuffer("userDefValue1"); - remoteCreateEntry("", "cacheObj2", val); - } catch (CacheException ce) { - fail("Exception while populating cache:\n" + ce); - } - } - }; - - for (int h = 0; h < Host.getHostCount(); h++) { - Host host = Host.getHost(h); - - for (int v = 0; v < host.getVMCount(); v++) { - VM vm = host.getVM(v); - vm.invoke(populator); - } - } - } - - - - /** - * Puts (or creates) a value in a region named regionName named - * entryName. - */ - protected void remoteCreateEntry(String regionName, String entryName, Object value) - throws CacheException { - - Region root = getRootRegion(); - Region region = root.getSubregion(regionName); - region.create(entryName, value); - - - LogWriterUtils.getLogWriter().info("Put value " + value + " in entry " + entryName - + " in region '" + region.getFullPath() + "'"); - - } - - // private long getConId(VM vm) { - // return vm.invoke(() -> this.remoteGetConId()); - // } - /** - * Accessed via reflection. DO NOT REMOVE - * - * @return - */ - protected static long remoteGetConId() { - return InternalDistributedSystem.getAnyInstance().getId(); - } - - private boolean acquireDistLock(VM vm, String lockName) { - return vm.invoke(() -> remoteAcquireDistLock(lockName)); - } - - /** - * Accessed via reflection. DO NOT REMOVE - * - * @param lockName - * @return - */ - protected static boolean remoteAcquireDistLock(String lockName) { - String serviceName = "cdmtest_service"; - DistributedLockService service = DistributedLockService.getServiceNamed(serviceName); - if (service == null) { - service = - DistributedLockService.create(serviceName, InternalDistributedSystem.getAnyInstance()); - } - assertNotNull(service); - try { - return service.lock(lockName, 1000, 3000); - } catch (Exception e) { - throw new RuntimeException("DEBUG: remoteAcquireDistLock", e); - // return false; - } - } - - private VM findVMForAdminObject(GemFireVM adminObj) { - for (int i = 0; i < Host.getHostCount(); i++) { - Host host = Host.getHost(i); - for (int j = 0; j < host.getVMCount(); j++) { - VM vm = host.getVM(j); - InternalDistributedMember id = getJavaGroupsIdForVM(vm); - if (adminObj.getId().equals(id)) { - return vm; - } - } - } - return null; - } - - private InternalDistributedMember getJavaGroupsIdForVM(VM vm) { - return (InternalDistributedMember) vm.invoke(() -> remoteGetJavaGroupsIdForVM()); - } - - /** - * INVOKED VIA REFLECTION - * - * @return - */ - protected static InternalDistributedMember remoteGetJavaGroupsIdForVM() { - InternalDistributedSystem sys = InternalDistributedSystem.getAnyInstance(); - return sys.getDistributionManager().getDistributionManagerId(); - - } -} diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionManagerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionManagerDUnitTest.java deleted file mode 100644 index 6f8a421..0000000 --- a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionManagerDUnitTest.java +++ /dev/null @@ -1,545 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.distributed.internal; - -import static org.apache.geode.distributed.ConfigurationProperties.*; -import static org.apache.geode.test.dunit.Assert.*; - -import java.net.InetAddress; -import java.util.Properties; -import java.util.concurrent.TimeUnit; - -import org.apache.logging.log4j.Logger; -import org.awaitility.Awaitility; -import org.junit.Assert; -import org.junit.Ignore; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import org.apache.geode.LogWriter; -import org.apache.geode.admin.AdminDistributedSystem; -import org.apache.geode.admin.AdminDistributedSystemFactory; -import org.apache.geode.admin.Alert; -import org.apache.geode.admin.AlertLevel; -import org.apache.geode.admin.AlertListener; -import org.apache.geode.admin.DistributedSystemConfig; -import org.apache.geode.cache.Cache; -import org.apache.geode.cache.CacheListener; -import org.apache.geode.cache.DataPolicy; -import org.apache.geode.cache.EntryEvent; -import org.apache.geode.cache.Region; -import org.apache.geode.cache.RegionEvent; -import org.apache.geode.cache.RegionFactory; -import org.apache.geode.cache.Scope; -import org.apache.geode.cache.util.CacheListenerAdapter; -import org.apache.geode.distributed.DistributedSystem; -import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -import org.apache.geode.distributed.internal.membership.MembershipManager; -import org.apache.geode.distributed.internal.membership.NetView; -import org.apache.geode.distributed.internal.membership.gms.MembershipManagerHelper; -import org.apache.geode.distributed.internal.membership.gms.interfaces.Manager; -import org.apache.geode.distributed.internal.membership.gms.mgr.GMSMembershipManager; -import org.apache.geode.internal.logging.LogService; -import org.apache.geode.test.dunit.Host; -import org.apache.geode.test.dunit.IgnoredException; -import org.apache.geode.test.dunit.NetworkUtils; -import org.apache.geode.test.dunit.SerializableRunnable; -import org.apache.geode.test.dunit.VM; -import org.apache.geode.test.dunit.Wait; -import org.apache.geode.test.dunit.WaitCriterion; -import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; -import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties; -import org.apache.geode.test.junit.categories.DistributedTest; -import org.apache.geode.test.junit.categories.MembershipTest; - -/** - * This class tests the functionality of the {@link ClusterDistributionManager} class. - */ -@Category({DistributedTest.class, MembershipTest.class}) -public class DistributionManagerDUnitTest extends JUnit4DistributedTestCase { - private static final Logger logger = LogService.getLogger(); - - public static DistributedSystem ds; - - @Rule - public DistributedRestoreSystemProperties restoreSystemProperties = - new DistributedRestoreSystemProperties(); - - /** - * Clears the exceptionInThread flag in the given distribution manager. - */ - public static void clearExceptionInThreads(ClusterDistributionManager dm) { - dm.clearExceptionInThreads(); - } - - @Override - public void preSetUp() throws Exception { - disconnectAllFromDS(); - } - - protected static class ItsOkayForMyClassNotToBeFound extends SerialDistributionMessage { - - public int getDSFID() { - return NO_FIXED_ID; - } - - @Override - protected void process(ClusterDistributionManager dm) { - // We should never get here - } - }; - - @Test - public void testGetDistributionVMType() { - DistributionManager dm = getSystem().getDistributionManager(); - InternalDistributedMember ipaddr = dm.getId(); - assertEquals(ClusterDistributionManager.NORMAL_DM_TYPE, ipaddr.getVmKind()); - } - - /** - * Send the distribution manager a message it can't deserialize - */ - @Ignore - @Test - public void testExceptionInThreads() throws InterruptedException { - ClusterDistributionManager dm = - (ClusterDistributionManager) getSystem().getDistributionManager(); - String p1 = "ItsOkayForMyClassNotToBeFound"; - logger.info("" + p1 + ""); - DistributionMessage m = new ItsOkayForMyClassNotToBeFound(); - dm.putOutgoing(m); - Thread.sleep(1 * 1000); - logger.info("" + p1 + ""); - Awaitility.await("waiting for exceptionInThreads to be true").atMost(15, TimeUnit.SECONDS) - .until(() -> { - return dm.exceptionInThreads(); - }); - dm.clearExceptionInThreads(); - assertTrue(!dm.exceptionInThreads()); - } - - /** - * Demonstrate that a new UDP port is used when an attempt is made to reconnect using a shunned - * port - */ - @Test - public void testConnectAfterBeingShunned() { - InternalDistributedSystem sys = getSystem(); - MembershipManager mgr = MembershipManagerHelper.getMembershipManager(sys); - InternalDistributedMember idm = mgr.getLocalMember(); - // TODO GMS needs to have a system property allowing the bind-port to be set - System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "jg-bind-port", "" + idm.getPort()); - sys.disconnect(); - sys = getSystem(); - mgr = MembershipManagerHelper.getMembershipManager(sys); - sys.disconnect(); - InternalDistributedMember idm2 = mgr.getLocalMember(); - org.apache.geode.test.dunit.LogWriterUtils.getLogWriter() - .info("original ID=" + idm + " and after connecting=" + idm2); - assertTrue("should not have used a different udp port", idm.getPort() == idm2.getPort()); - } - - /** - * Test the handling of "surprise members" in the membership manager. Create a DistributedSystem - * in this VM and then add a fake member to its surpriseMember set. Then ensure that it stays in - * the set when a new membership view arrives that doesn't contain it. Then wait until the member - * should be gone and force more view processing to have it scrubbed from the set. - **/ - @Test - public void testSurpriseMemberHandling() throws Exception { - - System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "surprise-member-timeout", "3000"); - InternalDistributedSystem sys = getSystem(); - MembershipManager mgr = MembershipManagerHelper.getMembershipManager(sys); - assertTrue(((GMSMembershipManager) mgr).isCleanupTimerStarted()); - - try { - InternalDistributedMember mbr = - new InternalDistributedMember(NetworkUtils.getIPLiteral(), 12345); - - // first make sure we can't add this as a surprise member (bug #44566) - - // if the view number isn't being recorded correctly the test will pass but the - // functionality is broken - Assert.assertTrue("expected view ID to be greater than zero", mgr.getView().getViewId() > 0); - - int oldViewId = mbr.getVmViewId(); - mbr.setVmViewId((int) mgr.getView().getViewId() - 1); - org.apache.geode.test.dunit.LogWriterUtils.getLogWriter() - .info("current membership view is " + mgr.getView()); - org.apache.geode.test.dunit.LogWriterUtils.getLogWriter() - .info("created ID " + mbr + " with view ID " + mbr.getVmViewId()); - - IgnoredException.addIgnoredException("attempt to add old member"); - IgnoredException.addIgnoredException("Removing shunned GemFire node"); - - boolean accepted = mgr.addSurpriseMember(mbr); - Assert.assertTrue("member with old ID was not rejected (bug #44566)", !accepted); - - mbr.setVmViewId(oldViewId); - - // now forcibly add it as a surprise member and show that it is reaped - long gracePeriod = 5000; - long startTime = System.currentTimeMillis(); - long timeout = ((GMSMembershipManager) mgr).getSurpriseMemberTimeout(); - long birthTime = startTime - timeout + gracePeriod; - MembershipManagerHelper.addSurpriseMember(sys, mbr, birthTime); - assertTrue("Member was not a surprise member", mgr.isSurpriseMember(mbr)); - - // if (birthTime < (System.currentTimeMillis() - timeout)) { - // return; // machine is too busy and we didn't get enough CPU to perform more assertions - // } - - Awaitility.await("waiting for member to be removed") - .atMost((timeout / 3) + gracePeriod, TimeUnit.MILLISECONDS) - .until(() -> !mgr.isSurpriseMember(mbr)); - - } finally { - if (sys != null && sys.isConnected()) { - sys.disconnect(); - } - } - } - - /** - * vm1 stores its cache in this static variable in testAckSeverAllertThreshold - */ - static Cache myCache; - - /** - * Tests that a severe-level alert is generated if a member does not respond with an ack quickly - * enough. vm0 and vm1 create a region and set ack-severe-alert-threshold. vm1 has a cache - * listener in its region that sleeps when notified, forcing the operation to take longer than - * ack-wait-threshold + ack-severe-alert-threshold - */ - @Test - public void testAckSevereAlertThreshold() throws Exception { - disconnectAllFromDS(); - Host host = Host.getHost(0); - // VM vm0 = host.getVM(0); - VM vm1 = host.getVM(1); - - // in order to set a small ack-wait-threshold, we have to remove the - // system property established by the dunit harness - String oldAckWait = (String) System.getProperties() - .remove(DistributionConfig.GEMFIRE_PREFIX + ACK_WAIT_THRESHOLD); - - try { - final Properties props = getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(ACK_WAIT_THRESHOLD, "3"); - props.setProperty(ACK_SEVERE_ALERT_THRESHOLD, "3"); - props.setProperty(NAME, "putter"); - - getSystem(props); - Region rgn = (new RegionFactory()).setScope(Scope.DISTRIBUTED_ACK).setEarlyAck(false) - .setDataPolicy(DataPolicy.REPLICATE).create("testRegion"); - - vm1.invoke(new SerializableRunnable("Connect to distributed system") { - public void run() { - props.setProperty(NAME, "sleeper"); - getSystem(props); - IgnoredException.addIgnoredException("elapsed while waiting for replies"); - RegionFactory rf = new RegionFactory(); - Region r = rf.setScope(Scope.DISTRIBUTED_ACK).setDataPolicy(DataPolicy.REPLICATE) - .setEarlyAck(false).addCacheListener(getSleepingListener(false)).create("testRegion"); - myCache = r.getCache(); - try { - createAlertListener(); - } catch (Exception e) { - throw new RuntimeException("failed to create alert listener", e); - } - } - }); - - // now we have two caches set up. vm1 has a listener that will sleep - // and cause the severe-alert threshold to be crossed - - rgn.put("bomb", "pow!"); // this will hang until vm1 responds - - rgn.getCache().close(); - basicGetSystem().disconnect(); - - vm1.invoke(new SerializableRunnable("disconnect from ds") { - public void run() { - if (!myCache.isClosed()) { - if (basicGetSystem().isConnected()) { - basicGetSystem().disconnect(); - } - myCache = null; - } - if (basicGetSystem().isConnected()) { - basicGetSystem().disconnect(); - } - synchronized (alertGuard) { - assertTrue(alertReceived); - } - } - }); - - } finally { - if (oldAckWait != null) { - System.setProperty(DistributionConfig.GEMFIRE_PREFIX + ACK_WAIT_THRESHOLD, oldAckWait); - } - } - } - - static volatile boolean regionDestroyedInvoked; - - static CacheListener getSleepingListener(final boolean playDead) { - regionDestroyedInvoked = false; - - return new CacheListenerAdapter() { - @Override - public void afterCreate(EntryEvent event) { - try { - if (playDead) { - MembershipManagerHelper.beSickMember(getSystemStatic()); - MembershipManagerHelper.playDead(getSystemStatic()); - } - Thread.sleep(15000); - } catch (InterruptedException ie) { - fail("interrupted", ie); - } - } - - @Override - public void afterRegionDestroy(RegionEvent event) { - LogWriter logger = myCache.getLogger(); - logger.info("afterRegionDestroyed invoked in sleeping listener"); - logger.info("service failure"); - logger.info( - "org.apache.geode.ForcedDisconnectException"); - regionDestroyedInvoked = true; - } - }; - } - - static AdminDistributedSystem adminSystem; - static Object alertGuard = new Object(); - static boolean alertReceived; - - static void createAlertListener() throws Exception { - DistributedSystemConfig config = - AdminDistributedSystemFactory.defineDistributedSystem(getSystemStatic(), null); - adminSystem = AdminDistributedSystemFactory.getDistributedSystem(config); - adminSystem.setAlertLevel(AlertLevel.SEVERE); - adminSystem.addAlertListener(new AlertListener() { - public void alert(Alert alert) { - try { - logger - .info("alert listener invoked for alert originating in " + alert.getConnectionName()); - logger.info(" alert text = " + alert.getMessage()); - logger.info(" systemMember = " + alert.getSystemMember()); - } catch (Exception e) { - logger.fatal("exception trying to use alert object", e); - } - synchronized (alertGuard) { - alertReceived = true; - } - } - }); - adminSystem.connect(); - assertTrue(adminSystem.waitToBeConnected(5 * 1000)); - } - - /** - * Tests that a sick member is kicked out - */ - @Test - public void testKickOutSickMember() throws Exception { - disconnectAllFromDS(); - IgnoredException.addIgnoredException("10 seconds have elapsed while waiting"); - Host host = Host.getHost(0); - // VM vm0 = host.getVM(0); - VM vm1 = host.getVM(1); - - // in order to set a small ack-wait-threshold, we have to remove the - // system property established by the dunit harness - String oldAckWait = (String) System.getProperties() - .remove(DistributionConfig.GEMFIRE_PREFIX + ACK_WAIT_THRESHOLD); - - try { - final Properties props = getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); // loner - props.setProperty(ACK_WAIT_THRESHOLD, "5"); - props.setProperty(ACK_SEVERE_ALERT_THRESHOLD, "5"); - props.setProperty(NAME, "putter"); - - getSystem(props); - Region rgn = (new RegionFactory()).setScope(Scope.DISTRIBUTED_ACK) - .setDataPolicy(DataPolicy.REPLICATE).create("testRegion"); - basicGetSystem().getLogWriter().info( - "sec have elapsed while waiting for replies"); - - vm1.invoke(new SerializableRunnable("Connect to distributed system") { - public void run() { - props.setProperty(NAME, "sleeper"); - getSystem(props); - LogWriter log = basicGetSystem().getLogWriter(); - log.info("service failure"); - log.info( - "org.apache.geode.ForcedDisconnectException"); - RegionFactory rf = new RegionFactory(); - Region r = rf.setScope(Scope.DISTRIBUTED_ACK).setDataPolicy(DataPolicy.REPLICATE) - .addCacheListener(getSleepingListener(true)).create("testRegion"); - myCache = r.getCache(); - } - }); - - // now we have two caches set up, each having an alert listener. Vm1 - // also has a cache listener that will turn off its ability to respond - // to "are you dead" messages and then sleep - - rgn.put("bomb", "pow!"); - - - rgn.getCache().close(); - basicGetSystem().getLogWriter().info( - "sec have elapsed while waiting for replies"); - basicGetSystem().disconnect(); - - vm1.invoke(new SerializableRunnable("wait for forced disconnect") { - public void run() { - // wait a while for the DS to finish disconnecting - WaitCriterion ev = new WaitCriterion() { - public boolean done() { - return !basicGetSystem().isConnected(); - } - - public String description() { - return null; - } - }; - // if this fails it means the sick member wasn't kicked out and something is wrong - Wait.waitForCriterion(ev, 60 * 1000, 200, true); - - ev = new WaitCriterion() { - public boolean done() { - return myCache.isClosed(); - } - - public String description() { - return null; - } - }; - Wait.waitForCriterion(ev, 20 * 1000, 200, false); - - if (!myCache.isClosed()) { - if (basicGetSystem().isConnected()) { - basicGetSystem().disconnect(); - } - myCache = null; - throw new RuntimeException("Test Failed - vm1's cache is not closed"); - } - if (basicGetSystem().isConnected()) { - basicGetSystem().disconnect(); - throw new RuntimeException("Test Failed - vm1's system should have been disconnected"); - } - - WaitCriterion wc = new WaitCriterion() { - public boolean done() { - return regionDestroyedInvoked; - } - - public String description() { - return "vm1's listener should have received afterRegionDestroyed notification"; - } - }; - Wait.waitForCriterion(wc, 30 * 1000, 1000, true); - - } - }); - - } finally { - if (oldAckWait != null) { - System.setProperty(DistributionConfig.GEMFIRE_PREFIX + ACK_WAIT_THRESHOLD, oldAckWait); - } - } - } - - /** - * test use of a bad bind-address for bug #32565 - */ - @Test - public void testBadBindAddress() throws Exception { - disconnectAllFromDS(); - - final Properties props = getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); // loner - // use a valid address that's not proper for this machine - props.setProperty(BIND_ADDRESS, "www.yahoo.com"); - props.setProperty(ACK_WAIT_THRESHOLD, "5"); - props.setProperty(ACK_SEVERE_ALERT_THRESHOLD, "5"); - try { - getSystem(props); - } catch (IllegalArgumentException e) { - org.apache.geode.test.dunit.LogWriterUtils.getLogWriter() - .info("caught expected exception (1)", e); - } - // use an invalid address - props.setProperty(BIND_ADDRESS, "bruce.schuchardt"); - try { - getSystem(props); - } catch (IllegalArgumentException e) { - org.apache.geode.test.dunit.LogWriterUtils.getLogWriter() - .info("caught expected exception (2_", e); - } - // use a valid bind address - props.setProperty(BIND_ADDRESS, InetAddress.getLocalHost().getCanonicalHostName()); - getSystem().disconnect(); - } - - /** - * install a new view and show that waitForViewInstallation works as expected - */ - @Test - public void testWaitForViewInstallation() { - getSystem(new Properties()); - - MembershipManager mgr = basicGetSystem().getDM().getMembershipManager(); - - final NetView v = mgr.getView(); - - final boolean[] passed = new boolean[1]; - Thread t = new Thread("wait for view installation") { - public void run() { - try { - ((ClusterDistributionManager) basicGetSystem().getDM()) - .waitForViewInstallation(v.getViewId() + 1); - synchronized (passed) { - passed[0] = true; - } - } catch (InterruptedException e) { - // failed - } - } - }; - t.setDaemon(true); - t.start(); - - Wait.pause(2000); - - NetView newView = new NetView(v, v.getViewId() + 1); - ((Manager) mgr).installView(newView); - - Wait.pause(2000); - - synchronized (passed) { - Assert.assertTrue(passed[0]); - } - } -} -- To stop receiving notification emails like this one, please contact "commits@geode.apache.org" .