Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4A89018050 for ; Fri, 9 Oct 2015 17:38:56 +0000 (UTC) Received: (qmail 64766 invoked by uid 500); 9 Oct 2015 17:38:51 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 64734 invoked by uid 500); 9 Oct 2015 17:38:51 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 64725 invoked by uid 99); 9 Oct 2015 17:38:51 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Oct 2015 17:38:51 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 864511A0754 for ; Fri, 9 Oct 2015 17:38:50 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.791 X-Spam-Level: * X-Spam-Status: No, score=1.791 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id JhJbEU57V4re for ; Fri, 9 Oct 2015 17:38:37 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 57524206E8 for ; Fri, 9 Oct 2015 17:38:34 +0000 (UTC) Received: (qmail 64423 invoked by uid 99); 9 Oct 2015 17:38:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Oct 2015 17:38:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 751D8DFD7A; Fri, 9 Oct 2015 17:38:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bschuchardt@apache.org To: commits@geode.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: incubator-geode git commit: GEODE-77 improved suspect processing, multicast fix for Windows Date: Fri, 9 Oct 2015 17:38:33 +0000 (UTC) Repository: incubator-geode Updated Branches: refs/heads/feature/GEODE-77 b6d01474d -> 8de790397 GEODE-77 improved suspect processing, multicast fix for Windows GMSHealthMonitor was allowing the service to watch itself if it ran out of other members to watch. This change set modifies that behavior to set the "next neighbor" to null, causing suspect processing to halt until a member transitions out of "suspect" state or a new membership view is installed. I also found that GMSHealthMonitor wasn't notifying the Manager when a member goes into final-check state, so that membership listeners weren't being notified of suspect events. This was causing a unit test failure. JGroupsMessenger now fools the JGroups stack into thinking an IPv4 stack is being used even if IPv6 is available, so that JGroups will use an IPv4 address. JGroups was selecting an IPv6 address, making communications with IPv4 members impossible and causing a schizm between the GemFire member address and the JGroups address. The JGroups log level is now set to WARN in the default log4j2.xml settings file. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/8de79039 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/8de79039 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/8de79039 Branch: refs/heads/feature/GEODE-77 Commit: 8de790397e2abd0dac9c0c0d347812f540ab625e Parents: b6d0147 Author: Bruce Schuchardt Authored: Fri Oct 9 10:37:26 2015 -0700 Committer: Bruce Schuchardt Committed: Fri Oct 9 10:38:19 2015 -0700 ---------------------------------------------------------------------- .../gemfire/admin/internal/InetAddressUtil.java | 2 +- .../internal/DistributionManager.java | 2 +- .../internal/membership/gms/ServiceConfig.java | 26 ++ .../internal/membership/gms/Services.java | 25 +- .../membership/gms/auth/GMSAuthenticator.java | 20 +- .../membership/gms/fd/GMSHealthMonitor.java | 60 +++-- .../membership/gms/interfaces/Manager.java | 6 - .../membership/gms/interfaces/Service.java | 8 + .../gms/locator/FindCoordinatorResponse.java | 4 +- .../membership/gms/membership/GMSJoinLeave.java | 237 ++++++++++++------- .../gms/messenger/JGroupsMessenger.java | 55 ++++- .../membership/gms/mgr/FakeViewMessage.java | 76 ------ .../gms/mgr/GMSMembershipManager.java | 8 +- .../membership/gms/mgr/LocalViewMessage.java | 76 ++++++ .../internal/tcpserver/TcpServer.java | 2 +- .../admin/remote/ShutdownAllRequest.java | 2 + .../gemfire/internal/tcp/Connection.java | 23 +- .../internal/logging/log4j/log4j2-default.xml | 2 +- .../index/MultiIndexCreationDUnitTest.java | 7 +- .../gemfire/distributed/LocatorDUnitTest.java | 35 ++- .../membership/GMSHealthMonitorJUnitTest.java | 40 ++-- .../messenger/JGroupsMessengerJUnitTest.java | 1 + .../cache/ConnectDisconnectDUnitTest.java | 15 +- .../cache/partitioned/ShutdownAllDUnitTest.java | 12 +- 24 files changed, 482 insertions(+), 262 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8de79039/gemfire-core/src/main/java/com/gemstone/gemfire/admin/internal/InetAddressUtil.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/admin/internal/InetAddressUtil.java b/gemfire-core/src/main/java/com/gemstone/gemfire/admin/internal/InetAddressUtil.java index 3decb8d..c0a4bc5 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/admin/internal/InetAddressUtil.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/admin/internal/InetAddressUtil.java @@ -36,7 +36,7 @@ public class InetAddressUtil { public static final InetAddress LOCALHOST = createLocalHost(); public static final String LOOPBACK_ADDRESS = - Boolean.getBoolean("java.net.preferIPv6Addresses") ? "::1" : "127.0.0.1"; + SocketCreator.preferIPv6Addresses() ? "::1" : "127.0.0.1"; public static final InetAddress LOOPBACK = InetAddressUtil.toInetAddress(LOOPBACK_ADDRESS); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8de79039/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java index 33d5b80..64e3965 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java @@ -1149,7 +1149,7 @@ public class DistributionManager membershipManager = MemberFactory.newMembershipManager(l, system.getConfig(), transport, stats); sb.append(System.currentTimeMillis() - start); - sb.append("/"); + this.myid = membershipManager.getLocalMember(); // dc.patchUpAddress(this.myid); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8de79039/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/ServiceConfig.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/ServiceConfig.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/ServiceConfig.java index d7f722a..dc25ad2 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/ServiceConfig.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/ServiceConfig.java @@ -1,5 +1,7 @@ package com.gemstone.gemfire.distributed.internal.membership.gms; +import java.net.InetAddress; + import com.gemstone.gemfire.distributed.Locator; import com.gemstone.gemfire.distributed.internal.DistributionConfig; import com.gemstone.gemfire.internal.SocketCreator; @@ -72,6 +74,30 @@ public class ServiceConfig { this.networkPartitionDetectionEnabled = enabled; } + /** + * returns the address that will be used by the DirectChannel to + * identify this member + */ + public InetAddress getInetAddress() { + String bindAddress = this.dconfig.getBindAddress(); + + try { + /* note: had to change the following to make sure the prop wasn't empty + in addition to not null for admin.DistributedSystemFactory */ + if (bindAddress != null && bindAddress.length() > 0) { + return InetAddress.getByName(bindAddress); + + } + else { + return SocketCreator.getLocalHost(); + } + } + catch (java.net.UnknownHostException unhe) { + throw new RuntimeException(unhe); + + } + } + public DistributionConfig getDistributionConfig() { return this.dconfig; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8de79039/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java index ff0a52e..8ab0bbd 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java @@ -11,6 +11,7 @@ import com.gemstone.gemfire.distributed.internal.DMStats; import com.gemstone.gemfire.distributed.internal.DistributionConfig; import com.gemstone.gemfire.distributed.internal.InternalLocator; import com.gemstone.gemfire.distributed.internal.membership.DistributedMembershipListener; +import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; import com.gemstone.gemfire.distributed.internal.membership.MembershipManager; import com.gemstone.gemfire.distributed.internal.membership.NetView; import com.gemstone.gemfire.distributed.internal.membership.gms.auth.GMSAuthenticator; @@ -34,7 +35,7 @@ public class Services { private static final Logger logger = LogService.getLogger(); - private static final ThreadGroup threadGroup = LoggingThreadGroup.createThreadGroup("Membership", logger); + private static final ThreadGroup threadGroup = LoggingThreadGroup.createThreadGroup("GemFire Membership", logger); private static InternalLogWriter staticLogWriter; private static InternalLogWriter staticSecurityLogWriter; @@ -55,7 +56,7 @@ public class Services { private InternalLogWriter logWriter; private InternalLogWriter securityLogWriter; - private Timer timer = new Timer("Membership Timer", true); + private Timer timer = new Timer("GemFire Membership Timer", true); @@ -259,6 +260,26 @@ public class Services { messenger.installView(v); manager.installView(v); } + + public void memberSuspected(InternalDistributedMember initiator, InternalDistributedMember suspect) { + try { + joinLeave.memberSuspected(initiator, suspect); + } finally { + try { + healthMon.memberSuspected(initiator, suspect); + } finally { + try { + auth.memberSuspected(initiator, suspect); + } finally { + try { + messenger.memberSuspected(initiator, suspect); + } finally { + manager.memberSuspected(initiator, suspect); + } + } + } + } + } public Manager getManager() { return manager; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8de79039/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/auth/GMSAuthenticator.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/auth/GMSAuthenticator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/auth/GMSAuthenticator.java index c4b822e..f018831 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/auth/GMSAuthenticator.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/auth/GMSAuthenticator.java @@ -43,50 +43,38 @@ public class GMSAuthenticator implements Authenticator { @Override public void start() { - // TODO Auto-generated method stub - } @Override public void started() { - // TODO Auto-generated method stub - } @Override public void stop() { - // TODO Auto-generated method stub - } @Override public void stopped() { - // TODO Auto-generated method stub - } @Override public void installView(NetView v) { - // TODO Auto-generated method stub - } @Override public void beSick() { - // TODO Auto-generated method stub - } @Override public void playDead() { - // TODO Auto-generated method stub - } @Override public void beHealthy() { - // TODO Auto-generated method stub - + } + + @Override + public void memberSuspected(InternalDistributedMember initiator, InternalDistributedMember suspect) { } /** http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8de79039/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java index f108e49..0b566e3 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java @@ -84,6 +84,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { public static final long MEMBER_SUSPECT_COLLECTION_INTERVAL = Long.getLong("geode.suspect-member-collection-interval", 200); volatile long currentTimeStamp; + + /** this member's ID */ + private InternalDistributedMember localAddress; final ConcurrentMap memberVsLastMsgTS = new ConcurrentHashMap<>(); final private Map requestIdVsResponse = new ConcurrentHashMap<>(); @@ -128,14 +131,15 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { */ @Override public void contactedBy(InternalDistributedMember sender) { - if (currentSuspects.remove(sender)) { - logger.info("No longer suspecting {}", sender); - } CustomTimeStamp cTS = new CustomTimeStamp(currentTimeStamp); cTS = memberVsLastMsgTS.putIfAbsent(sender, cTS); if (cTS != null) { cTS.setTimeStamp(currentTimeStamp); } + if (currentSuspects.remove(sender)) { + logger.info("No longer suspecting {}", sender); + setNextNeighbor(currentView, null); + } } /** @@ -175,10 +179,12 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { @Override public void run() { - InternalDistributedMember neighbour = nextNeighbor; if (GMSHealthMonitor.this.isStopping) { return; } + + InternalDistributedMember neighbour = nextNeighbor; + long currentTime = System.currentTimeMillis(); //this is the start of interval to record member activity GMSHealthMonitor.this.currentTimeStamp = currentTime; @@ -245,11 +251,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { public void run() { boolean pinged = GMSHealthMonitor.this.doCheckMember(mbr); if (!pinged) { - String reason = String.format("Member isn't responding to check message: %s", mbr); + String reason = "Member isn't responding to health checks"; GMSHealthMonitor.this.sendSuspectMessage(mbr, reason); currentSuspects.add(mbr); } else { - logger.trace("Setting next neighbour as member {} has not responded.", mbr); + logger.trace("Setting next neighbor as member {} has not responded.", mbr); currentSuspects.remove(mbr); // back to previous one setNextNeighbor(GMSHealthMonitor.this.currentView, null); @@ -412,7 +418,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { */ private synchronized void setNextNeighbor(NetView newView, InternalDistributedMember nextTo) { if (nextTo == null) { - nextTo = services.getJoinLeave().getMemberID(); + nextTo = localAddress; } boolean sameView = false; @@ -423,6 +429,16 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } List allMembers = newView.getMembers(); + + Set checkAllSuspected = new HashSet<>(allMembers); + checkAllSuspected.removeAll(currentSuspects); + checkAllSuspected.remove(localAddress); + if (checkAllSuspected.isEmpty() && allMembers.size() > 1) { + logger.info("All other members are suspect at this point"); + nextNeighbor = null; + return; + } + int index = allMembers.indexOf(nextTo); if (index != -1) { int nextNeighborIndex = (index + 1) % allMembers.size(); @@ -437,6 +453,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { nextNeighbor = newNeighbor; } } + + if (nextNeighbor != null && nextNeighbor.equals(localAddress)) { + nextNeighbor = null; + } if (!sameView || memberVsLastMsgTS.size() == 0) { @@ -468,7 +488,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { @Override public void started() { - + this.localAddress = services.getMessenger().getMemberID(); } @Override @@ -516,6 +536,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } @Override + public void memberSuspected(InternalDistributedMember initiator, InternalDistributedMember suspect) { + } + + @Override public void beSick() { this.beingSick = true; } @@ -578,7 +602,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } // only respond if the intended recipient is this member - InternalDistributedMember me = services.getMessenger().getMemberID(); + InternalDistributedMember me = localAddress; if (me.getVmViewId() < 0 || m.getTarget().equals(me)) { CheckResponseMessage prm = new CheckResponseMessage(m.getRequestId()); prm.setRecipient(m.getSender()); @@ -628,14 +652,12 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { return; } - InternalDistributedMember localAddress = services.getJoinLeave().getMemberID(); - if (cv.getCoordinator().equals(localAddress)) { for (SuspectRequest req: incomingRequest.getMembers()) { - logger.info("received suspect message from {}: {}", - sender, req.getReason()); + logger.info("received suspect message from {} for {}: {}", + sender, req.getSuspectMember(), req.getReason()); } - doFinalCheck(sMembers, cv, localAddress); + doFinalCheck(sender, sMembers, cv, localAddress); }// coordinator ends else { @@ -659,7 +681,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { logger.info("received suspect message from {} for {}: {}", sender, req.getSuspectMember(), req.getReason()); } - doFinalCheck(smbr, cv, localAddress); + doFinalCheck(sender, smbr, cv, localAddress); } else { recordSuspectRequests(sMembers, cv); } @@ -690,7 +712,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } } - private void doFinalCheck(List sMembers, NetView cv, InternalDistributedMember localAddress) { + private void doFinalCheck(final InternalDistributedMember initiator, + List sMembers, NetView cv, InternalDistributedMember localAddress) { for (int i = 0; i < sMembers.size(); i++) { final SuspectRequest sr = sMembers.get(i); final InternalDistributedMember mbr = sr.getSuspectMember(); @@ -708,18 +731,19 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { if (view == null || !view.equals(cv)) { final String reason = sr.getReason(); - logger.debug("Doing final check for member {}; reason={}", mbr, reason); + logger.debug("Scheduling final check for member {}; reason={}", mbr, reason); // its a coordinator checkExecutor.execute(new Runnable() { @Override public void run() { try { + services.memberSuspected(initiator, mbr); long startTime = System.currentTimeMillis(); CustomTimeStamp ts = new CustomTimeStamp(startTime); memberVsLastMsgTS.put(mbr, ts); - logger.info("Membership: Doing final check for suspect member {}", mbr); + logger.info("Performing final check for suspect member {} reason={}", mbr, reason); boolean pinged = GMSHealthMonitor.this.doCheckMember(mbr); if (!pinged && !isStopping) { ts = memberVsLastMsgTS.get(mbr); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8de79039/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Manager.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Manager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Manager.java index bd9274e..18cd8d4 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Manager.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Manager.java @@ -98,12 +98,6 @@ public interface Manager extends Service, MessageHandler { // void membershipFailure(String message, Exception cause); /** - * used by HealthMonitor to tell Manager that a member is under - * suspicion - */ - void memberSuspected(SuspectMember suspect); - - /** * Indicate whether we are attempting a reconnect */ boolean isReconnectingDS(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8de79039/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Service.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Service.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Service.java index 082ea0a..86ff500 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Service.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Service.java @@ -1,5 +1,6 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.interfaces; +import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; import com.gemstone.gemfire.distributed.internal.membership.NetView; import com.gemstone.gemfire.distributed.internal.membership.gms.Services; @@ -55,4 +56,11 @@ public interface Service { * shut down threads, cancel timers, etc. */ void emergencyClose(); + + /** + * a member is suspected of having crashed + * @param initiator + * @param suspect + */ + void memberSuspected(InternalDistributedMember initiator, InternalDistributedMember suspect); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8de79039/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java index 7177d04..5f9576b 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java @@ -86,8 +86,8 @@ public class FindCoordinatorResponse extends HighPriorityDistributionMessage if (this.isShortForm) { return "FindCoordinatorResponse(coordinator="+coordinator+")"; } else { - return "FindCoordinatorResponse(coordinator="+coordinator+", fromView="+fromView+", viewId="+view.getViewId() - +", registrants=" + registrants.size() + return "FindCoordinatorResponse(coordinator="+coordinator+", fromView="+fromView+", viewId="+(view==null? "nul" : view.getViewId()) + +", registrants=" + (registrants == null? 0 : registrants.size()) +", network partition detection enabled="+this.networkPartitionDetectionEnabled +", locators preferred as coordinators="+this.usePreferredCoordinators+")"; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8de79039/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java index 3fb86ee..5ed3e88 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java @@ -194,66 +194,76 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { */ public boolean join() { - if (Boolean.getBoolean(BYPASS_DISCOVERY)) { - becomeCoordinator(); - return true; - } - - SearchState state = searchState; - - long timeout = services.getConfig().getJoinTimeout(); - logger.debug("join timeout is set to {}", timeout); - long retrySleep = JOIN_RETRY_SLEEP; - long startTime = System.currentTimeMillis(); - long giveupTime = startTime + timeout; - - for (int tries=0; !this.isJoined; tries++) { - - boolean found = findCoordinator(); - if (found) { - logger.debug("found possible coordinator {}", state.possibleCoordinator); - if (localAddress.getNetMember().preferredForCoordinator() - && state.possibleCoordinator.equals(this.localAddress)) { - if (tries > 2 || System.currentTimeMillis() < giveupTime ) { - becomeCoordinator(); - return true; + try { + if (Boolean.getBoolean(BYPASS_DISCOVERY)) { + becomeCoordinator(); + return true; + } + + SearchState state = searchState; + + long timeout = services.getConfig().getJoinTimeout(); + logger.debug("join timeout is set to {}", timeout); + long retrySleep = JOIN_RETRY_SLEEP; + long startTime = System.currentTimeMillis(); + long giveupTime = startTime + timeout; + + for (int tries=0; !this.isJoined; tries++) { + logger.debug("searching for the membership coordinator"); + boolean found = findCoordinator(); + if (found) { + logger.debug("found possible coordinator {}", state.possibleCoordinator); + if (localAddress.getNetMember().preferredForCoordinator() + && state.possibleCoordinator.equals(this.localAddress)) { + if (tries > 2 || System.currentTimeMillis() < giveupTime ) { + becomeCoordinator(); + return true; + } + } else { + if (attemptToJoin()) { + return true; + } + if (System.currentTimeMillis() > giveupTime) { + break; + } + if (!state.possibleCoordinator.equals(localAddress)) { + state.alreadyTried.add(state.possibleCoordinator); + } } } else { - if (attemptToJoin()) { - return true; - } if (System.currentTimeMillis() > giveupTime) { break; } - if (!state.possibleCoordinator.equals(localAddress)) { - state.alreadyTried.add(state.possibleCoordinator); - } } - } else { - if (System.currentTimeMillis() > giveupTime) { - break; + try { + logger.debug("sleeping for {} before making another attempt to find the coordinator", retrySleep); + Thread.sleep(retrySleep); + } catch (InterruptedException e) { + logger.debug("retry sleep interrupted - giving up on joining the distributed system"); + return false; } + } // for + + if (!this.isJoined) { + logger.debug("giving up attempting to join the distributed system after " + (System.currentTimeMillis() - startTime) + "ms"); } - try { - Thread.sleep(retrySleep); - } catch (InterruptedException e) { - logger.debug("retry sleep interrupted - giving up on joining the distributed system"); - return false; + + // to preserve old behavior we need to throw a SystemConnectException if + // unable to contact any of the locators + if (!this.isJoined && state.hasContactedALocator) { + throw new SystemConnectException("Unable to join the distributed system in " + + (System.currentTimeMillis()-startTime) + "ms"); + } + + return this.isJoined; + } finally { + // notify anyone waiting on the address to be completed + if (this.isJoined) { + synchronized(this.localAddress) { + this.localAddress.notifyAll(); + } } - } // for - - if (!this.isJoined) { - logger.debug("giving up attempting to join the distributed system after " + (System.currentTimeMillis() - startTime) + "ms"); - } - - // to preserve old behavior we need to throw a SystemConnectException if - // unable to contact any of the locators - if (!this.isJoined && state.hasContactedALocator) { - throw new SystemConnectException("Unable to join the distributed system in " - + (System.currentTimeMillis()-startTime) + "ms"); } - - return this.isJoined; } /** @@ -605,8 +615,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { recips.addAll(view.getCrashedMembers()); } - logger.info((preparing? "preparing" : "sending") + " new view " + view); - if (preparing) { this.preparedView = view; } else { @@ -614,20 +622,25 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } if (recips.isEmpty()) { + logger.info("no recipients for new view aside from myself"); return true; } + logger.info((preparing? "preparing" : "sending") + " new view " + view); + msg.setRecipients(recips); Set pendingLeaves = getPendingRequestIDs(LEAVE_REQUEST_MESSAGE); Set pendingRemovals = getPendingRequestIDs(REMOVE_MEMBER_REQUEST); + pendingRemovals.removeAll(view.getCrashedMembers()); rp.initialize(id, responders); rp.processPendingRequests(pendingLeaves, pendingRemovals); services.getMessenger().send(msg); // only wait for responses during preparation if (preparing) { -logger.debug("waiting for view responses"); + logger.debug("waiting for view responses"); + Set failedToRespond = rp.waitForResponses(); logger.info("finished waiting for responses to view preparation"); @@ -653,7 +666,7 @@ logger.debug("waiting for view responses"); private void processViewMessage(InstallViewMessage m) { - logger.info("Membership: processing {}", m); + logger.debug("Membership: processing {}", m); NetView view = m.getView(); @@ -744,7 +757,7 @@ logger.debug("waiting for view responses"); true); FindCoordinatorResponse response = (o instanceof FindCoordinatorResponse) ? (FindCoordinatorResponse)o : null; if (response != null && response.getCoordinator() != null) { - anyResponses = false; + anyResponses = true; NetView v = response.getView(); int viewId = v == null? -1 : v.getViewId(); if (viewId > state.viewId) { @@ -761,30 +774,7 @@ logger.debug("waiting for view responses"); coordinators.add(response.getCoordinator()); if (!flagsSet) { flagsSet = true; - - boolean enabled = response.isNetworkPartitionDetectionEnabled(); - if (!enabled && services.getConfig().isNetworkPartitionDetectionEnabled()) { - throw new GemFireConfigException("locator at "+addr - +" does not have network-partition-detection enabled but my configuration has it enabled"); - } - - GMSMember mbr = (GMSMember)this.localAddress.getNetMember(); - mbr.setSplitBrainEnabled(enabled); - services.getConfig().setNetworkPartitionDetectionEnabled(enabled); - services.getConfig().getDistributionConfig().setEnableNetworkPartitionDetection(enabled); - - if (response.isUsePreferredCoordinators()) { - this.quorumRequired = true; - logger.debug("The locator indicates that all locators should be preferred as coordinators"); - if (services.getLocator() != null - || Locator.hasLocator() - || !services.getConfig().getDistributionConfig().getStartLocator().isEmpty() - || localAddress.getVmKind() == DistributionManager.LOCATOR_DM_TYPE) { - ((GMSMember)localAddress.getNetMember()).setPreferredForCoordinator(true); - } - } else { - ((GMSMember)localAddress.getNetMember()).setPreferredForCoordinator(true); - } + inheritSettingsFromLocator(addr, response); } } } catch (IOException | ClassNotFoundException problem) { @@ -794,7 +784,7 @@ logger.debug("waiting for view responses"); return false; } if (!anyResponses) { - try { Thread.sleep(2000); } catch (InterruptedException e) { + try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; } @@ -903,6 +893,36 @@ logger.debug("waiting for view responses"); } /** + * Some settings are gleaned from locator responses and set into the local + * configuration + */ + private void inheritSettingsFromLocator(InetSocketAddress addr, FindCoordinatorResponse response) { + boolean enabled = response.isNetworkPartitionDetectionEnabled(); + if (!enabled && services.getConfig().isNetworkPartitionDetectionEnabled()) { + throw new GemFireConfigException("locator at "+addr + +" does not have network-partition-detection enabled but my configuration has it enabled"); + } + + GMSMember mbr = (GMSMember)this.localAddress.getNetMember(); + mbr.setSplitBrainEnabled(enabled); + services.getConfig().setNetworkPartitionDetectionEnabled(enabled); + services.getConfig().getDistributionConfig().setEnableNetworkPartitionDetection(enabled); + + if (response.isUsePreferredCoordinators()) { + this.quorumRequired = true; + logger.debug("The locator indicates that all locators should be preferred as coordinators"); + if (services.getLocator() != null + || Locator.hasLocator() + || !services.getConfig().getDistributionConfig().getStartLocator().isEmpty() + || localAddress.getVmKind() == DistributionManager.LOCATOR_DM_TYPE) { + ((GMSMember)localAddress.getNetMember()).setPreferredForCoordinator(true); + } + } else { + ((GMSMember)localAddress.getNetMember()).setPreferredForCoordinator(true); + } + } + + /** * receives a JoinResponse holding a membership view or rejection message * @param rsp */ @@ -1080,7 +1100,8 @@ logger.debug("waiting for view responses"); int oldWeight = currentView.memberWeight(); int failedWeight = newView.getCrashedMemberWeight(currentView); if (failedWeight > 0) { - if (logger.isInfoEnabled()) { + if (logger.isInfoEnabled() + && !newView.getCreator().equals(localAddress)) { // view-creator logs this newView.logCrashedMemberWeights(currentView, logger); } int failurePoint = (int)(Math.round(51 * oldWeight) / 100.0); @@ -1147,8 +1168,12 @@ logger.debug("waiting for view responses"); @Override public void stopped() { - // TODO Auto-generated method stub - + } + + @Override + public void memberSuspected(InternalDistributedMember initiator, InternalDistributedMember suspect) { + prepareProcessor.memberSuspected(initiator, suspect); + viewProcessor.memberSuspected(initiator, suspect); } @@ -1359,21 +1384,34 @@ logger.debug("waiting for view responses"); // We don't want to mix the two because pending removals // aren't reflected as having crashed in the current view // and need to cause a new view to be generated - notRepliedYet.removeAll(pendingLeaves); synchronized(this.pendingRemovals) { this.pendingRemovals.addAll(pendingRemovals); } } + synchronized void memberSuspected(InternalDistributedMember initiator, + InternalDistributedMember suspect) { + if (waiting) { + // we will do a final check on this member if it hasn't already + // been done, so stop waiting for it now + logger.debug("view response processor recording suspect status for {}", suspect); + pendingRemovals.add(suspect); + checkIfDone(); + } + } + synchronized void processLeaveRequest(InternalDistributedMember mbr) { if (waiting) { + logger.debug("view response processor recording leave request for {}", mbr); stopWaitingFor(mbr); } } synchronized void processRemoveRequest(InternalDistributedMember mbr) { if (waiting) { + logger.debug("view response processor recording remove request for {}", mbr); pendingRemovals.add(mbr); + checkIfDone(); } } @@ -1388,6 +1426,7 @@ logger.debug("waiting for view responses"); this.conflictingView = conflictingView; } + logger.debug("view response processor recording response for {}", sender); stopWaitingFor(sender); } } @@ -1395,11 +1434,18 @@ logger.debug("waiting for view responses"); /** call with synchronized(this) */ private void stopWaitingFor(InternalDistributedMember mbr) { notRepliedYet.remove(mbr); + checkIfDone(); + } + + /** call with synchronized(this) */ + private void checkIfDone() { if (notRepliedYet.isEmpty() || (pendingRemovals != null && pendingRemovals.containsAll(notRepliedYet))) { logger.debug("All anticipated view responses received - notifying waiting thread"); waiting = false; notify(); + } else { + logger.debug("Still waiting for these view replies: {}", notRepliedYet); } } @@ -1529,6 +1575,13 @@ logger.debug("waiting for view responses"); } finally { waiting = false; } + if (viewRequests.size() == 1) { + // start the timer when we have only one request because + // concurrent startup / shutdown of multiple members is + // a common occurrence + okayToCreateView = System.currentTimeMillis() + MEMBER_REQUEST_COLLECTION_INTERVAL; + continue; + } } else { if (System.currentTimeMillis() < okayToCreateView) { // sleep to let more requests arrive @@ -1705,6 +1758,8 @@ logger.debug("waiting for view responses"); } prepared = prepareView(newView, joinReqs); + logger.debug("view preparation phase completed. prepared={}", prepared); + if (prepared) { break; } @@ -1712,12 +1767,14 @@ logger.debug("waiting for view responses"); Set unresponsive = prepareProcessor.getUnresponsiveMembers(); unresponsive.removeAll(removalReqs); unresponsive.removeAll(leaveReqs); - try { - removeHealthyMembers(unresponsive); - } catch (InterruptedException e) { - // abort the view if interrupted - shutdown = true; - return; + if (!unresponsive.isEmpty()) { + try { + removeHealthyMembers(unresponsive); + } catch (InterruptedException e) { + // abort the view if interrupted + shutdown = true; + return; + } } List failures = new ArrayList<>(currentView.getCrashedMembers().size() + unresponsive.size()); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8de79039/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java index a345df1..92f9d38 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java @@ -11,11 +11,13 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; +import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; @@ -31,6 +33,7 @@ import org.jgroups.Event; import org.jgroups.JChannel; import org.jgroups.Message; import org.jgroups.Message.Flag; +import org.jgroups.Message.TransientFlag; import org.jgroups.Receiver; import org.jgroups.View; import org.jgroups.ViewId; @@ -60,6 +63,7 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.Services; import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.MessageHandler; import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Messenger; import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinResponseMessage; +import com.gemstone.gemfire.distributed.internal.membership.gms.mgr.GMSMembershipManager; import com.gemstone.gemfire.internal.ClassPathLoader; import com.gemstone.gemfire.internal.HeapDataOutputStream; import com.gemstone.gemfire.internal.OSProcess; @@ -191,7 +195,7 @@ public class JGroupsMessenger implements Messenger { if (transport.isMcastEnabled()) { properties = replaceStrings(properties, "MCAST_PORT", String.valueOf(transport.getMcastId().getPort())); - properties = replaceStrings(properties, "MCAST_ADDRESS", transport.getMcastId().getHost().getHostAddress()); + properties = replaceStrings(properties, "MCAST_ADDRESS", dc.getMcastAddress().getHostAddress()); properties = replaceStrings(properties, "MCAST_TTL", String.valueOf(dc.getMcastTtl())); properties = replaceStrings(properties, "MCAST_SEND_BUFFER_SIZE", String.valueOf(dc.getMcastSendBufferSize())); properties = replaceStrings(properties, "MCAST_RECV_BUFFER_SIZE", String.valueOf(dc.getMcastRecvBufferSize())); @@ -256,6 +260,7 @@ public class JGroupsMessenger implements Messenger { reconnecting = true; } else { + checkForWindowsIPv6(); InputStream is = new ByteArrayInputStream(properties.getBytes("UTF-8")); myChannel = new JChannel(is); } @@ -287,9 +292,25 @@ public class JGroupsMessenger implements Messenger { establishLocalAddress(); - logger.debug("JGroups channel created (took {}ms)", System.currentTimeMillis()-start); + logger.info("JGroups channel created (took {}ms)", System.currentTimeMillis()-start); } + + /** + * JGroups picks an IPv6 address if preferIPv4Stack is false or not set + * and preferIPv6Addresses is not set or is true. We want it to use an + * IPv4 address for a dual-IP stack so that both IPv4 and IPv6 messaging work + */ + private void checkForWindowsIPv6() throws Exception { + boolean isWindows = ((String)System.getProperty("os.name")).indexOf("Windows") >= 0; + boolean preferIpV6Addr = Boolean.getBoolean("java.net.preferIPv6Addresses"); + if (isWindows && !preferIpV6Addr) { + logger.debug("Windows detected - forcing JGroups to think IPv4 is being used so it will choose an IPv4 address"); + Field m = org.jgroups.util.Util.class.getDeclaredField("ip_stack_type"); + m.setAccessible(true); + m.set(null, org.jgroups.util.StackType.IPv4); + } + } @Override public void started() { @@ -312,6 +333,10 @@ public class JGroupsMessenger implements Messenger { } @Override + public void memberSuspected(InternalDistributedMember initiator, InternalDistributedMember suspect) { + } + + @Override public void installView(NetView v) { this.view = v; @@ -325,6 +350,7 @@ public class JGroupsMessenger implements Messenger { ViewId vid = new ViewId(new JGAddress(v.getCoordinator()), v.getViewId()); View jgv = new View(vid, new ArrayList
(mbrs)); this.jgView = jgv; + logger.trace("installing JGroups view: {}", jgv); this.myChannel.down(new Event(Event.VIEW_CHANGE, jgv)); } @@ -441,6 +467,7 @@ public class JGroupsMessenger implements Messenger { // code to create a versioned input stream, read the sender address, then read the message // and set its sender address DMStats theStats = services.getStatistics(); + NetView oldView = this.view; if (!myChannel.isConnected()) { logger.info("JGroupsMessenger channel is closed - messaging is not possible"); @@ -461,7 +488,7 @@ public class JGroupsMessenger implements Messenger { if (logger.isDebugEnabled()) { String recips = "multicast"; if (!useMcast) { - recips = msg.getRecipients().toString(); + recips = Arrays.toString(msg.getRecipients()); } logger.debug("sending via JGroups: [{}] recipients: {}", msg, recips); } @@ -474,8 +501,10 @@ public class JGroupsMessenger implements Messenger { try { long startSer = theStats.startMsgSerialization(); Message jmsg = createJGMessage(msg, local, Version.CURRENT_ORDINAL); + jmsg.setTransientFlag(TransientFlag.DONT_LOOPBACK); theStats.endMsgSerialization(startSer); theStats.incSentBytes(jmsg.getLength()); + logger.trace("Sending JGroups message: {}", jmsg); myChannel.send(jmsg); } catch (IllegalArgumentException e) { @@ -560,8 +589,7 @@ public class JGroupsMessenger implements Messenger { Message tmp = (i < (calculatedLen-1)) ? jmsg.copy(true) : jmsg; tmp.setDest(to); tmp.setSrc(this.jgAddress); - if (logger.isTraceEnabled()) - logger.trace("Unicasting to {}", to); + logger.trace("Unicasting to {}", to); myChannel.send(tmp); } catch (Exception e) { @@ -596,12 +624,12 @@ public class JGroupsMessenger implements Messenger { return Collections.emptySet(); } Set result = new HashSet(); - NetView newView = services.getJoinLeave().getView(); - if (newView != null) { + NetView newView = this.view; + if (newView != null && newView != oldView) { for (int i = 0; i < destinations.length; i ++) { InternalDistributedMember d = destinations[i]; if (!newView.contains(d)) { - logger.debug("messenger: member has left the view: {}", d); + logger.debug("messenger: member has left the view: {} view is now {}", d, newView); result.add(d); } } @@ -773,6 +801,13 @@ public class JGroupsMessenger implements Messenger { } /** + * returns the JChannel for test verification + */ + public JChannel getJGroupsChannel() { + return this.myChannel; + } + + /** * for unit testing we need to replace UDP with a fake UDP protocol */ public void setJGroupsStackConfigForTesting(String config) { @@ -838,7 +873,7 @@ public class JGroupsMessenger implements Messenger { pingPonger.sendPongMessage(myChannel, jgAddress, jgmsg.getSrc()); } catch (Exception e) { - logger.info("Failed sending Pong message to " + jgmsg.getSrc()); + logger.info("Failed sending Pong response to " + jgmsg.getSrc()); } return; } else if (pingPonger.isPongMessage(contents)) { @@ -881,7 +916,7 @@ public class JGroupsMessenger implements Messenger { try { if (logger.isTraceEnabled()) { - logger.trace("JGroupsMessenger dispatching {}", msg); + logger.trace("JGroupsMessenger dispatching {} from {}", msg, msg.getSender()); } filterIncomingMessage(msg); MessageHandler h = getMessageHandler(msg); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8de79039/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/FakeViewMessage.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/FakeViewMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/FakeViewMessage.java deleted file mode 100755 index 3b63fb6..0000000 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/FakeViewMessage.java +++ /dev/null @@ -1,76 +0,0 @@ -/*========================================================================= - * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. - * This product is protected by U.S. and international copyright - * and intellectual property laws. Pivotal products are covered by - * one or more patents listed at http://www.pivotal.io/patents. - *========================================================================= - */ -package com.gemstone.gemfire.distributed.internal.membership.gms.mgr; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import com.gemstone.gemfire.distributed.internal.DistributionManager; -import com.gemstone.gemfire.distributed.internal.SerialDistributionMessage; -import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; -import com.gemstone.gemfire.distributed.internal.membership.NetView; - - -/** - ViewMessage is used to pass a new membership view to the GemFire cache - in an orderly manner. It is intended to be queued with serially - executed messages so that the view takes effect at the proper time. - - @author Bruce Schuchardt - */ - -public final class FakeViewMessage extends SerialDistributionMessage -{ - - private GMSMembershipManager manager; - private long viewId; - private NetView view; - - public FakeViewMessage( - InternalDistributedMember addr, - long viewId, - NetView view, - GMSMembershipManager manager - ) - { - super(); - this.sender = addr; - this.viewId = viewId; - this.view = view; - this.manager = manager; - } - - @Override - final public int getProcessorType() { - return DistributionManager.VIEW_EXECUTOR; - } - - - @Override - protected void process(DistributionManager dm) { - //dm.getLogger().info("view message processed", new Exception()); - manager.processView(viewId, view); - } - - // These "messages" are never DataSerialized - - public int getDSFID() { - throw new UnsupportedOperationException(); - } - - @Override - public void toData(DataOutput out) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void fromData(DataInput in) throws IOException, ClassNotFoundException { - throw new UnsupportedOperationException(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8de79039/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java index 14aa10d..76c3165 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java @@ -9,7 +9,6 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.mgr; import java.io.IOException; import java.io.NotSerializableException; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -1352,7 +1351,7 @@ public class GMSMembershipManager implements MembershipManager, Manager logger.trace(LogMarker.DM_VIEWS, "Membership: queuing new view for processing, id = {}, view = {}", newId, newView); } - FakeViewMessage v = new FakeViewMessage(address, newId, newView, + LocalViewMessage v = new LocalViewMessage(address, newId, newView, GMSMembershipManager.this); listener.messageReceived(v); @@ -1362,8 +1361,9 @@ public class GMSMembershipManager implements MembershipManager, Manager } @Override - public void memberSuspected(SuspectMember suspect) { - handleOrDeferSuspect(suspect); + public void memberSuspected(InternalDistributedMember initiator, InternalDistributedMember suspect) { + SuspectMember s = new SuspectMember(initiator, suspect); + handleOrDeferSuspect(s); } /** http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8de79039/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/LocalViewMessage.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/LocalViewMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/LocalViewMessage.java new file mode 100755 index 0000000..cd20b65 --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/LocalViewMessage.java @@ -0,0 +1,76 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.distributed.internal.membership.gms.mgr; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import com.gemstone.gemfire.distributed.internal.DistributionManager; +import com.gemstone.gemfire.distributed.internal.SerialDistributionMessage; +import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; +import com.gemstone.gemfire.distributed.internal.membership.NetView; + + +/** + LocalViewMessage is used to pass a new membership view to the GemFire cache + in an orderly manner. It is intended to be queued with serially + executed messages so that the view takes effect at the proper time. + + @author Bruce Schuchardt + */ + +public final class LocalViewMessage extends SerialDistributionMessage +{ + + private GMSMembershipManager manager; + private long viewId; + private NetView view; + + public LocalViewMessage( + InternalDistributedMember addr, + long viewId, + NetView view, + GMSMembershipManager manager + ) + { + super(); + this.sender = addr; + this.viewId = viewId; + this.view = view; + this.manager = manager; + } + + @Override + final public int getProcessorType() { + return DistributionManager.VIEW_EXECUTOR; + } + + + @Override + protected void process(DistributionManager dm) { + //dm.getLogger().info("view message processed", new Exception()); + manager.processView(viewId, view); + } + + // These "messages" are never DataSerialized + + public int getDSFID() { + throw new UnsupportedOperationException(); + } + + @Override + public void toData(DataOutput out) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8de79039/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServer.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServer.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServer.java index a859ba6..235c2b9 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServer.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServer.java @@ -70,7 +70,7 @@ public class TcpServer { * with the addition of support for all old versions of clients you can no * longer change this version number */ - public final static int GOSSIPVERSION = 1003; + public final static int GOSSIPVERSION = 1002; // Don't change it ever. We did NOT send GemFire version in a Gossip request till 1001 version. // This GOSSIPVERSION is used in _getVersionForAddress request for getting GemFire version of a GossipServer. public final static int OLDGOSSIPVERSION = 1001; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8de79039/gemfire-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/ShutdownAllRequest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/ShutdownAllRequest.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/ShutdownAllRequest.java index 242f969..18d0daf 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/ShutdownAllRequest.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/ShutdownAllRequest.java @@ -258,6 +258,7 @@ public class ShutdownAllRequest extends AdminRequest { } if(msg instanceof ShutdownAllResponse) { if (((ShutdownAllResponse)msg).isToShutDown()) { + logger.debug("{} adding {} to result set {}", this, msg.getSender(), results); results.add(msg.getSender()); } else { @@ -284,6 +285,7 @@ public class ShutdownAllRequest extends AdminRequest { } public Set getResults() { + logger.debug("{} shutdownAll returning {}", this, results, new Exception("stack trace")); return results; } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8de79039/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java index eaa1b56..2e903f7 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java @@ -858,24 +858,31 @@ public class Connection implements Runnable { private static final int CONNECT_HANDSHAKE_SIZE = 4096; - private void handshakeNio() throws IOException { - // We jump through some extra hoops to use a MsgOutputStream - // This keeps us from allocating an extra DirectByteBuffer. + /** + * waits until we've joined the distributed system + * before returning + */ + private void waitForAddressCompletion() { InternalDistributedMember myAddr = this.owner.getConduit().getLocalAddress(); synchronized (myAddr) { - while (myAddr.getInetAddress() == null) { + while ((owner.getConduit().getCancelCriterion().cancelInProgress() == null) + && myAddr.getInetAddress() == null && myAddr.getVmViewId() < 0) { try { - myAddr.wait(); // spurious wakeup ok + myAddr.wait(100); // spurious wakeup ok } catch (InterruptedException ie) { Thread.currentThread().interrupt(); this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ie); } } + Assert.assertTrue(myAddr.getDirectChannelPort() == this.owner.getConduit().getPort()); } + } - Assert.assertTrue(myAddr.getDirectChannelPort() == this.owner.getConduit().getPort()); - + private void handshakeNio() throws IOException { + waitForAddressCompletion(); + + InternalDistributedMember myAddr = this.owner.getConduit().getLocalAddress(); final MsgOutputStream connectHandshake = new MsgOutputStream(CONNECT_HANDSHAKE_SIZE); //connectHandshake.reset(); @@ -911,6 +918,8 @@ public class Connection implements Runnable { } private void handshakeStream() throws IOException { + waitForAddressCompletion(); + //this.output = new BufferedOutputStream(getSocket().getOutputStream(), owner.getConduit().bufferSize); this.output = getSocket().getOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream(CONNECT_HANDSHAKE_SIZE); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8de79039/gemfire-core/src/main/resources/com/gemstone/gemfire/internal/logging/log4j/log4j2-default.xml ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/resources/com/gemstone/gemfire/internal/logging/log4j/log4j2-default.xml b/gemfire-core/src/main/resources/com/gemstone/gemfire/internal/logging/log4j/log4j2-default.xml index e383d0a..5f89760 100644 --- a/gemfire-core/src/main/resources/com/gemstone/gemfire/internal/logging/log4j/log4j2-default.xml +++ b/gemfire-core/src/main/resources/com/gemstone/gemfire/internal/logging/log4j/log4j2-default.xml @@ -14,7 +14,7 @@ - + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8de79039/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/MultiIndexCreationDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/MultiIndexCreationDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/MultiIndexCreationDUnitTest.java index c3023e7..4bc1fa6 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/MultiIndexCreationDUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/MultiIndexCreationDUnitTest.java @@ -70,10 +70,12 @@ public class MultiIndexCreationDUnitTest extends CacheTestCase { AsyncInvocation a2 = server1.invokeAsync(new SerializableCallable("Create Server1") { @Override public Object call() throws Exception { - while (!hooked){ + long giveupTime = System.currentTimeMillis() + 60000; + while (!hooked && System.currentTimeMillis() < giveupTime) { getLogWriter().info("Query Waiting for index hook."); pause(100); } + assertTrue(hooked); QueryObserver old = QueryObserverHolder .setInstance(new QueryObserverAdapter() { @@ -163,10 +165,11 @@ public class MultiIndexCreationDUnitTest extends CacheTestCase { @Override public void hook(int spot) throws RuntimeException { + long giveupTime = System.currentTimeMillis() + 60000; if (spot == 13) { hooked = true; getLogWriter().info("MultiIndexCreationTestHook is hooked in create defined indexes."); - while (hooked) { + while (hooked && System.currentTimeMillis() < giveupTime) { getLogWriter().info("MultiIndexCreationTestHook waiting."); pause(100); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8de79039/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java index f441023..cf936a4 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java @@ -65,11 +65,6 @@ public class LocatorDUnitTest extends DistributedTestCase { super(name); } - private static final String WAIT1_MS_NAME = "LocatorDUnitTest.WAIT1_MS"; - private static final int WAIT1_MS_DEFAULT = 40000; // 5000 -- see bug 36470 - private static final int WAIT1_MS - = Integer.getInteger(WAIT1_MS_NAME, WAIT1_MS_DEFAULT).intValue(); - private static final String WAIT2_MS_NAME = "LocatorDUnitTest.WAIT2_MS"; private static final int WAIT2_MS_DEFAULT = 5000; // 2000 -- see bug 36470 private static final int WAIT2_MS @@ -127,7 +122,6 @@ public class LocatorDUnitTest extends DistributedTestCase { properties.put("mcast-port", "0"); properties.put("start-locator", locators); properties.put("log-level", getDUnitLogLevel()); -// properties.put("log-level", getDUnitLogLevel()); properties.put("security-peer-auth-init","com.gemstone.gemfire.distributed.AuthInitializer.create"); properties.put("security-peer-authenticator","com.gemstone.gemfire.distributed.MyAuthenticator.create"); properties.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); @@ -532,7 +526,7 @@ public class LocatorDUnitTest extends DistributedTestCase { assertTrue(MembershipManagerHelper.getLeadMember(sys) == null); - properties.put("log-level", getDUnitLogLevel()); +// properties.put("log-level", getDUnitLogLevel()); DistributedMember mem1 = (DistributedMember)vm1.invoke(this.getClass(), "getDistributedMember", connectArgs); @@ -1042,6 +1036,7 @@ public class LocatorDUnitTest extends DistributedTestCase { bgexecLogger.info(addExpected); boolean exceptionOccurred = true; + String oldValue = (String)System.getProperties().put("p2p.joinTimeout", "15000"); try { DistributedSystem.connect(props); exceptionOccurred = false; @@ -1059,6 +1054,11 @@ public class LocatorDUnitTest extends DistributedTestCase { fail("Failed with unexpected exception", ex); } finally { + if (oldValue == null) { + System.getProperties().remove("p2p.joinTimeout"); + } else { + System.getProperties().put("p2p.joinTimeout", oldValue); + } bgexecLogger.info(removeExpected); } @@ -1164,6 +1164,14 @@ public class LocatorDUnitTest extends DistributedTestCase { } } + public void testRepeat() throws Exception { + for (int i=0; i<10; i++) { + System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> run #"+i); + testLocatorBecomesCoordinator(); + tearDown(); + setUp(); + } + } /** * Tests starting one locator in a remote VM and having multiple * members of the distributed system join it. This ensures that @@ -1449,6 +1457,8 @@ public class LocatorDUnitTest extends DistributedTestCase { props.setProperty("mcast-port", String.valueOf(mcastport)); props.setProperty("locators", locators); props.setProperty("log-level", getDUnitLogLevel()); + props.setProperty("mcast-ttl", "0"); + props.setProperty("enable-network-partition-detection", "true"); props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); Locator.startLocatorAndDS(port1, logFile, null, props); @@ -1466,6 +1476,8 @@ public class LocatorDUnitTest extends DistributedTestCase { props.setProperty("mcast-port", String.valueOf(mcastport)); props.setProperty("locators", locators); props.setProperty("log-level", getDUnitLogLevel()); + props.setProperty("mcast-ttl", "0"); + props.setProperty("enable-network-partition-detection", "true"); props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); Locator.startLocatorAndDS(port2, logFile, null, props); } @@ -1481,6 +1493,9 @@ public class LocatorDUnitTest extends DistributedTestCase { Properties props = new Properties(); props.setProperty("mcast-port", String.valueOf(mcastport)); props.setProperty("locators", locators); + props.setProperty("log-level", getDUnitLogLevel()); + props.setProperty("mcast-ttl", "0"); + props.setProperty("enable-network-partition-detection", "true"); DistributedSystem.connect(props); } }; @@ -1491,6 +1506,9 @@ public class LocatorDUnitTest extends DistributedTestCase { Properties props = new Properties(); props.setProperty("mcast-port", String.valueOf(mcastport)); props.setProperty("locators", locators); + props.setProperty("log-level", getDUnitLogLevel()); + props.setProperty("mcast-ttl", "0"); + props.setProperty("enable-network-partition-detection", "true"); system = (InternalDistributedSystem)DistributedSystem.connect(props); WaitCriterion ev = new WaitCriterion() { @@ -1504,7 +1522,7 @@ public class LocatorDUnitTest extends DistributedTestCase { return false; // NOTREACHED } public String description() { - return null; + return "waiting for 5 members - have " + system.getDM().getViewMembers().size(); } }; DistributedTestCase.waitForCriterion(ev, WAIT2_MS, 200, true); @@ -1756,6 +1774,7 @@ public class LocatorDUnitTest extends DistributedTestCase { /** return the distributed member id for the ds on this vm */ public static DistributedMember getDistributedMember(Properties props) { + props.put("name", "vm_"+VM.getCurrentVMNum()); DistributedSystem sys = DistributedSystem.connect(props); sys.getLogWriter().info("service failure"); sys.getLogWriter().info("com.gemstone.gemfire.ConnectException"); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8de79039/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSHealthMonitorJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSHealthMonitorJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSHealthMonitorJUnitTest.java index bc75d43..ae77b81 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSHealthMonitorJUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSHealthMonitorJUnitTest.java @@ -94,6 +94,7 @@ public class GMSHealthMonitorJUnitTest { InternalDistributedMember mbr = new InternalDistributedMember(SocketCreator.getLocalHost(), 12345); when(messenger.getMemberID()).thenReturn(mbr); when(messenger.send(any(CheckResponseMessage.class))).thenAnswer(messageSent); + gmsHealthMonitor.started(); gmsHealthMonitor.processMessage(new CheckRequestMessage(mbr, 1)); Assert.assertTrue("Check Response should have been sent", messageSent.isMethodExecuted()); @@ -108,11 +109,12 @@ public class GMSHealthMonitorJUnitTest { NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet(), new HashSet()); MethodExecuted messageSent = new MethodExecuted(); - when(services.getJoinLeave().getMemberID()).thenAnswer(messageSent); + when(services.getMessenger().getMemberID()).thenAnswer(messageSent); + gmsHealthMonitor.started(); gmsHealthMonitor.installView(v); - Assert.assertTrue("It should have got memberID from services.getJoinLeave().getMemberID()", messageSent.isMethodExecuted()); + Assert.assertTrue("It should have got memberID from services.getMessenger().getMemberID()", messageSent.isMethodExecuted()); } /** @@ -123,7 +125,8 @@ public class GMSHealthMonitorJUnitTest { NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet(), new HashSet()); - when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3)); + when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); + gmsHealthMonitor.started(); gmsHealthMonitor.installView(v); @@ -139,7 +142,8 @@ public class GMSHealthMonitorJUnitTest { // System.out.printf("memberID is %s view is %s\n", mockMembers.get(3), v); // 3rd is current member - when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3)); + when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); + gmsHealthMonitor.started(); gmsHealthMonitor.installView(v); @@ -154,7 +158,8 @@ public class GMSHealthMonitorJUnitTest { } // neighbor should change to 5th - Assert.assertEquals(mockMembers.get(5), neighbor); + Assert.assertEquals("expected " + mockMembers.get(5) + " but found " + neighbor + + ". view="+v, mockMembers.get(5), neighbor); } /** @@ -167,7 +172,8 @@ public class GMSHealthMonitorJUnitTest { NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet(), new HashSet()); // 3rd is current member - when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3)); + when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); + gmsHealthMonitor.started(); gmsHealthMonitor.installView(v); @@ -189,7 +195,8 @@ public class GMSHealthMonitorJUnitTest { MethodExecuted messageSent = new MethodExecuted(); // 3rd is current member - when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3)); + when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); + gmsHealthMonitor.started(); gmsHealthMonitor.installView(v); @@ -215,7 +222,7 @@ public class GMSHealthMonitorJUnitTest { MethodExecuted messageSent = new MethodExecuted(); // 3rd is current member - when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3)); + when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); gmsHealthMonitor.installView(v); @@ -240,7 +247,7 @@ public class GMSHealthMonitorJUnitTest { MethodExecuted messageSent = new MethodExecuted(); // 3rd is current member - when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3)); + when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); gmsHealthMonitor.installView(v); @@ -263,7 +270,7 @@ public class GMSHealthMonitorJUnitTest { MethodExecuted messageSent = new MethodExecuted(); // 3rd is current member - when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3)); + when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); gmsHealthMonitor.installView(v); @@ -290,7 +297,8 @@ public class GMSHealthMonitorJUnitTest { MethodExecuted messageSent = new MethodExecuted(); // 3rd is current member - when(joinLeave.getMemberID()).thenReturn(mockMembers.get(0)); // coordinator and local member + when(messenger.getMemberID()).thenReturn(mockMembers.get(0)); // coordinator and local member + gmsHealthMonitor.started(); gmsHealthMonitor.installView(v); @@ -325,7 +333,8 @@ public class GMSHealthMonitorJUnitTest { MethodExecuted messageSent = new MethodExecuted(); // 3rd is current member - when(joinLeave.getMemberID()).thenReturn(mockMembers.get(0)); // coordinator and local member + when(messenger.getMemberID()).thenReturn(mockMembers.get(0)); // coordinator and local member + gmsHealthMonitor.started(); gmsHealthMonitor.installView(v); @@ -361,7 +370,8 @@ public class GMSHealthMonitorJUnitTest { MethodExecuted messageSent = new MethodExecuted(); // preferred coordinators are 0 and 1 - when(joinLeave.getMemberID()).thenReturn(mockMembers.get(1));// next preferred coordinator + when(messenger.getMemberID()).thenReturn(mockMembers.get(1));// next preferred coordinator + gmsHealthMonitor.started(); gmsHealthMonitor.installView(v); @@ -396,7 +406,7 @@ public class GMSHealthMonitorJUnitTest { NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet(), new HashSet()); // 3rd is current member - when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3)); + when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); gmsHealthMonitor.installView(v); @@ -417,7 +427,7 @@ public class GMSHealthMonitorJUnitTest { MethodExecuted messageSent = new MethodExecuted(); // 3rd is current member - when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3)); + when(messenger.getMemberID()).thenReturn(mockMembers.get(3)); gmsHealthMonitor.installView(v); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8de79039/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java index 83c1419..e803f28 100755 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java @@ -72,6 +72,7 @@ public class JGroupsMessengerJUnitTest { when(stopper.isCancelInProgress()).thenReturn(false); manager = mock(Manager.class); + when(manager.isMulticastAllowed()).thenReturn(enableMcast); joinLeave = mock(JoinLeave.class); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8de79039/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ConnectDisconnectDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ConnectDisconnectDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ConnectDisconnectDUnitTest.java old mode 100644 new mode 100755 index 44ee0a1..fc7ab03 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ConnectDisconnectDUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ConnectDisconnectDUnitTest.java @@ -1,19 +1,33 @@ package com.gemstone.gemfire.internal.cache; import java.io.File; +import java.io.IOException; +import java.net.InetAddress; import java.util.List; import java.util.Properties; +import org.jgroups.protocols.UDP; + import com.gemstone.gemfire.cache.TimeoutException; import com.gemstone.gemfire.cache30.CacheSerializableRunnable; import com.gemstone.gemfire.cache30.CacheTestCase; +import com.gemstone.gemfire.distributed.DistributedSystem; +import com.gemstone.gemfire.distributed.Locator; +import com.gemstone.gemfire.distributed.internal.DistributionConfig; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.distributed.internal.membership.MembershipManager; +import com.gemstone.gemfire.distributed.internal.membership.gms.MembershipManagerHelper; +import com.gemstone.gemfire.distributed.internal.membership.gms.messenger.JGroupsMessenger; +import com.gemstone.gemfire.distributed.internal.membership.gms.mgr.GMSMembershipManager; import com.gemstone.gemfire.internal.AvailablePort; import com.gemstone.gemfire.internal.AvailablePortHelper; import dunit.AsyncInvocation; +import dunit.DistributedTestCase; import dunit.Host; import dunit.SerializableRunnable; import dunit.VM; +import dunit.DistributedTestCase.WaitCriterion; /** A test of 46438 - missing response to an update attributes message */ public class ConnectDisconnectDUnitTest extends CacheTestCase { @@ -30,7 +44,6 @@ public class ConnectDisconnectDUnitTest extends CacheTestCase { } - // see bugs #50785 and #46438 public void testManyConnectsAndDisconnects() throws Throwable { // invokeInEveryVM(new SerializableRunnable() { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8de79039/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/ShutdownAllDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/ShutdownAllDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/ShutdownAllDUnitTest.java index e08b507..a61648c 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/ShutdownAllDUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/ShutdownAllDUnitTest.java @@ -502,6 +502,16 @@ public class ShutdownAllDUnitTest extends CacheTestCase { checkData(vm0, numBuckets, 113, "b", "region"); } + +// public void testRepeat() throws Throwable { +// for (int i=0; i<10; i++) { +// System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> run #"+i); +// testShutdownAllWithMembersWaiting(); +// tearDown(); +// setUp(); +// } +// } + /** * Test for 43551. Do a shutdown all with some * members waiting on recovery. @@ -532,7 +542,7 @@ public class ShutdownAllDUnitTest extends CacheTestCase { AsyncInvocation a0 = createRegionAsync(vm0, "region", "disk", true, 1); //Wait a bit for the initialization to get stuck - pause(5000); + pause(20000); assertTrue(a0.isAlive()); //Do another shutdown all, with a member offline and another stuck