Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 44B2610470 for ; Mon, 14 Sep 2015 15:44:03 +0000 (UTC) Received: (qmail 6093 invoked by uid 500); 14 Sep 2015 15:44:03 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 6062 invoked by uid 500); 14 Sep 2015 15:44:03 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 6053 invoked by uid 99); 14 Sep 2015 15:44:03 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Sep 2015 15:44:03 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 941E7C0429 for ; Mon, 14 Sep 2015 15:44:02 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.775 X-Spam-Level: * X-Spam-Status: No, score=1.775 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.006, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id w7bkpKi_-fWz for ; Mon, 14 Sep 2015 15:43:55 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 8C76A42B36 for ; Mon, 14 Sep 2015 15:43:54 +0000 (UTC) Received: (qmail 5531 invoked by uid 99); 14 Sep 2015 15:43:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Sep 2015 15:43:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E6AD3E0414; Mon, 14 Sep 2015 15:43:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bschuchardt@apache.org To: commits@geode.incubator.apache.org Message-Id: <159f521498bc4f99afa36cedb58c9816@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-geode git commit: GEODE-77: GMSJoinLeave now collects removal requests to see if it should become the membership coordinator Date: Mon, 14 Sep 2015 15:43:53 +0000 (UTC) Repository: incubator-geode Updated Branches: refs/heads/feature/GEODE-77 1553ddcd0 -> 972b60564 GEODE-77: GMSJoinLeave now collects removal requests to see if it should become the membership coordinator Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/972b6056 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/972b6056 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/972b6056 Branch: refs/heads/feature/GEODE-77 Commit: 972b60564a042ebc5e6f074b165d1b23cdda1c61 Parents: 1553ddc Author: Bruce Schuchardt Authored: Mon Sep 14 08:43:16 2015 -0700 Committer: Bruce Schuchardt Committed: Mon Sep 14 08:43:16 2015 -0700 ---------------------------------------------------------------------- .../internal/DistributionManager.java | 8 +-- .../internal/membership/NetView.java | 56 +++++++++++--------- .../membership/gms/fd/GMSHealthMonitor.java | 33 ++++++++---- .../membership/gms/membership/GMSJoinLeave.java | 52 +++++++++++++----- .../gms/messages/LeaveRequestMessage.java | 8 +++ .../gms/mgr/GMSMembershipManager.java | 2 +- .../gemfire/internal/tcp/Connection.java | 12 +++++ .../membership/MembershipJUnitTest.java | 8 +-- .../membership/GMSHealthMonitorJUnitTest.java | 27 +++++----- .../gms/membership/GMSJoinLeaveJUnitTest.java | 46 ++++++++-------- 10 files changed, 156 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/972b6056/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java index a6a425b..ef8fcb1 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java @@ -3395,8 +3395,8 @@ public class DistributionManager // message.setRecipient(DistributionManager.this.getId()); - if (logger.isTraceEnabled(LogMarker.DM)) { - logger.trace(LogMarker.DM, "Received message '{}' from <{}>", message, message.getSender()); + if (logger.isTraceEnabled()) { + logger.trace("Received message '{}' from <{}>", message, message.getSender()); } scheduleIncomingMessage(message); } @@ -3585,8 +3585,8 @@ public class DistributionManager m.setRecipients(allOthers); //Address recipient = (Address) m.getRecipient(); - if (logger.isTraceEnabled(LogMarker.DM)) { - logger.trace(LogMarker.DM, "{} Sending {} to {}", this.getDistributionManagerId(), m, m.getRecipientsDescription()); + if (logger.isTraceEnabled()) { + logger.trace("{} Sending {} to {}", this.getDistributionManagerId(), m, m.getRecipientsDescription()); } try { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/972b6056/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java index 0f893c3..60be680 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java @@ -37,24 +37,21 @@ import com.gemstone.gemfire.internal.Version; */ public class NetView implements DataSerializableFixedID { private static final long serialVersionUID = -8888347937416039434L; + private int viewId; private List members; - private List shutdownMembers; - private List crashedMembers; + private Set shutdownMembers; + private Set crashedMembers; private InternalDistributedMember creator; private Set hashedMembers; static final private Random rd = new Random(); - // TODO:need to clear this - /** membership logger */ - private static final Logger logger = Services.getLogger(); - public NetView() { viewId = 0; members = new ArrayList(4); - this.hashedMembers = new HashSet(members); - shutdownMembers = Collections.EMPTY_LIST; - crashedMembers = Collections.EMPTY_LIST; + this.hashedMembers = new HashSet<>(members); + shutdownMembers = Collections.emptySet(); + crashedMembers = new HashSet<>(); creator = null; } @@ -62,9 +59,9 @@ public class NetView implements DataSerializableFixedID { viewId = 0; members = new ArrayList(4); members.add(creator); - this.hashedMembers = new HashSet(members); - shutdownMembers = Collections.EMPTY_LIST; - crashedMembers = Collections.EMPTY_LIST; + this.hashedMembers = new HashSet<>(members); + shutdownMembers = new HashSet<>(); + crashedMembers = Collections.emptySet(); this.creator = creator; int seed = creator.hashCode() + (int) System.currentTimeMillis(); } @@ -74,8 +71,8 @@ public class NetView implements DataSerializableFixedID { this.viewId = (int) viewId; members = new ArrayList(size); this.hashedMembers = new HashSet(members); - shutdownMembers = Collections.EMPTY_LIST; - crashedMembers = Collections.EMPTY_LIST; + shutdownMembers = new HashSet<>(); + crashedMembers = Collections.emptySet(); creator = null; } @@ -84,12 +81,12 @@ public class NetView implements DataSerializableFixedID { this.viewId = viewId; this.members = new ArrayList(other.members); this.hashedMembers = new HashSet(other.members); - this.shutdownMembers = new ArrayList(other.shutdownMembers); - this.crashedMembers = new ArrayList(other.crashedMembers); + this.shutdownMembers = new HashSet(other.shutdownMembers); + this.crashedMembers = new HashSet(other.crashedMembers); } - public NetView(InternalDistributedMember creator, int viewId, List mbrs, List shutdowns, - List crashes) { + public NetView(InternalDistributedMember creator, int viewId, List mbrs, Set shutdowns, + Set crashes) { this.creator = creator; this.viewId = viewId; this.members = mbrs; @@ -144,11 +141,19 @@ public class NetView implements DataSerializableFixedID { this.hashedMembers.add(mbr); this.members.add(mbr); } + + public void addCrashedMembers(Set mbr) { + this.crashedMembers.addAll(mbr); + } public boolean remove(InternalDistributedMember mbr) { this.hashedMembers.remove(mbr); return this.members.remove(mbr); } + + public void removeAll(Collection ids) { + this.members.removeAll(ids); + } public boolean contains(InternalDistributedMember mbr) { return this.hashedMembers.contains(mbr); @@ -245,11 +250,11 @@ public class NetView implements DataSerializableFixedID { return results; } - public List getShutdownMembers() { + public Set getShutdownMembers() { return this.shutdownMembers; } - public List getCrashedMembers() { + public Set getCrashedMembers() { return this.crashedMembers; } @@ -332,8 +337,7 @@ public class NetView implements DataSerializableFixedID { * not counted */ public List getActualCrashedMembers(NetView oldView) { - List result = new ArrayList(this.crashedMembers.size()); - InternalDistributedMember lead = oldView.getLeadMember(); + List result = new ArrayList<>(this.crashedMembers.size()); for (InternalDistributedMember mbr : this.crashedMembers) { if ((mbr.getVmKind() != DistributionManager.ADMIN_ONLY_DM_TYPE) && oldView.contains(mbr)) { result.add(mbr); @@ -433,8 +437,8 @@ public class NetView implements DataSerializableFixedID { DataSerializer.writeObject(creator, out); out.writeInt(viewId); writeAsArrayList(members, out); - writeAsArrayList(shutdownMembers, out); - writeAsArrayList(crashedMembers, out); + InternalDataSerializer.writeSet(shutdownMembers, out); + InternalDataSerializer.writeSet(crashedMembers, out); } @Override @@ -443,8 +447,8 @@ public class NetView implements DataSerializableFixedID { viewId = in.readInt(); members = DataSerializer.readArrayList(in); this.hashedMembers = new HashSet(members); - shutdownMembers = DataSerializer.readArrayList(in); - crashedMembers = DataSerializer.readArrayList(in); + shutdownMembers = InternalDataSerializer.readHashSet(in); + crashedMembers = InternalDataSerializer.readHashSet(in); } /** this will deserialize as an ArrayList */ http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/972b6056/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java index d0822d0..22a6633 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java @@ -99,6 +99,12 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { * to stop check scheduler */ private ScheduledFuture monitorFuture; + + /** test hook */ + boolean playingDead = false; + + /** test hook */ + boolean beingSick = false; public GMSHealthMonitor() { @@ -167,7 +173,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { if (nextNeighbourTS == null) { CustomTimeStamp customTS = new CustomTimeStamp(); - customTS.setTimeStamp(System.currentTimeMillis()); + customTS.setTimeStamp(currentTime); memberVsLastMsgTS.put(neighbour, customTS); return; } @@ -265,9 +271,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { if (pingResp.getResponseMsg() == null) { pingResp.wait(services.getConfig().getMemberTimeout()); } + CustomTimeStamp ts = memberVsLastMsgTS.get(pingMember); if (pingResp.getResponseMsg() == null) { - // double check the activity log - CustomTimeStamp ts = memberVsLastMsgTS.get(pingMember); + // double check the activity map if (ts != null && ts.getTimeStamp() > (System.currentTimeMillis() - services.getConfig().getMemberTimeout())) { @@ -275,6 +281,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } return false; } else { + if (ts != null) { + ts.setTimeStamp(System.currentTimeMillis()); + } return true; } } @@ -314,10 +323,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { return pinged; } - public void playDead(boolean b) { - - } - public void start() { { scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() { @@ -475,17 +480,18 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { @Override public void beSick() { - + this.beingSick = true; } @Override public void playDead() { - + this.playingDead = true; } @Override public void beHealthy() { - + this.beingSick = false; + this.playingDead = false; } @Override @@ -517,6 +523,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { } private void processPingRequest(PingRequestMessage m) { + + if (beingSick || playingDead) { + return; + } + // only respond if the intended recipient is this member InternalDistributedMember me = services.getMessenger().getMemberID(); if (me.getVmViewId() < 0 || m.getTarget().equals(me)) { @@ -645,7 +656,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler { @Override public void run() { try { - logger.debug("Doing final check for member {}", mbr); + logger.info("Membership: Doing final check for suspect member {}", mbr); boolean pinged = GMSHealthMonitor.this.doCheckMember(mbr); if (!pinged) { GMSHealthMonitor.this.services.getJoinLeave().remove(mbr, reason); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/972b6056/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java index b248f1b..af99530 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java @@ -1,6 +1,5 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.membership; -import static com.gemstone.gemfire.distributed.internal.DistributionManager.LOCATOR_DM_TYPE; import static com.gemstone.gemfire.internal.DataSerializableFixedID.INSTALL_VIEW_MESSAGE; import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_REQUEST; import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_RESPONSE; @@ -110,6 +109,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { /** the currently installed view */ private volatile NetView currentView; + private final Set removedMembers = new HashSet<>(); + /** a new view being installed */ private NetView preparedView; @@ -328,6 +329,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { logger.debug("JoinLeave is checking to see if I should become coordinator"); NetView check = new NetView(v, v.getViewId()+1); check.remove(incomingRequest.getMemberID()); + synchronized(removedMembers) { + check.removeAll(removedMembers); + check.addCrashedMembers(removedMembers); + } if (check.getCoordinator().equals(localAddress)) { becomeCoordinator(incomingRequest.getMemberID()); } @@ -433,16 +438,18 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { NetView newView; synchronized(viewInstallationLock) { int viewNumber = currentView.getViewId() + 5; - List mbrs = new ArrayList(currentView.getMembers()); + List mbrs = new ArrayList<>(currentView.getMembers()); if (!mbrs.contains(localAddress)) { mbrs.add(localAddress); } - List leaving = new ArrayList(); + Set leaving = new HashSet<>(); if (oldCoordinator != null) { leaving.add(oldCoordinator); } - newView = new NetView(this.localAddress, viewNumber, mbrs, leaving, - Collections.emptyList()); + synchronized(this.removedMembers) { + newView = new NetView(this.localAddress, viewNumber, mbrs, leaving, + this.removedMembers); + } } sendView(newView, Collections.emptyList()); startCoordinatorServices(); @@ -460,10 +467,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } } - private void sendRemoveMessages(List newMbrs, + private void sendRemoveMessages(Set removals, List reasons, NetView newView) { Iterator reason = reasons.iterator(); - for (InternalDistributedMember mbr: newMbrs) { + for (InternalDistributedMember mbr: removals) { RemoveMemberMessage response = new RemoveMemberMessage(mbr, mbr, reason.next()); services.getMessenger().send(response); } @@ -535,6 +542,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { private void processViewMessage(InstallViewMessage m) { + + logger.info("Membership: processing {}", m); + NetView view = m.getView(); if (currentView != null && view.getViewId() < currentView.getViewId()) { @@ -732,6 +742,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } } } + synchronized(removedMembers) { + removedMembers.clear(); + } } /** @@ -868,8 +881,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { catch (InterruptedException e) { Thread.currentThread().interrupt(); } } else { - logger.debug("JoinLeave sending a leave request to {}", view.getCoordinator()); - LeaveRequestMessage m = new LeaveRequestMessage(view.getCoordinator(), this.localAddress, "this member is shutting down"); + List coords = view.getPreferredCoordinators(Collections.emptySet(), localAddress, 5); + + logger.debug("JoinLeave sending a leave request to {}", coords); + LeaveRequestMessage m = new LeaveRequestMessage(coords, this.localAddress, "this member is shutting down"); services.getMessenger().send(m); } } // view.size @@ -896,6 +911,16 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { msg.setSender(this.localAddress); processRemoveRequest(msg); } else { + final NetView check; + synchronized(removedMembers) { + removedMembers.add(m); + check = new NetView(v, v.getViewId()); + check.addCrashedMembers(removedMembers); + check.removeAll(removedMembers); + } + if (check.getCoordinator().equals(this.localAddress)) { + becomeCoordinator(v.getCoordinator()); + } services.getMessenger().send(msg); } } @@ -903,8 +928,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { @Override public void memberShutdown(DistributedMember mbr, String reason) { + if (this.isCoordinator) { - LeaveRequestMessage msg = new LeaveRequestMessage(this.localAddress, (InternalDistributedMember)mbr, reason); + LeaveRequestMessage msg = new LeaveRequestMessage(Collections.singleton(this.localAddress), (InternalDistributedMember)mbr, reason); recordViewRequest(msg); } } @@ -1156,9 +1182,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { * Returns false if the view cannot be prepared successfully, true otherwise */ void createAndSendView(List requests) { - List joinReqs = new ArrayList(); - List leaveReqs = new ArrayList(); - List removalReqs = new ArrayList(); + List joinReqs = new ArrayList<>(); + Set leaveReqs = new HashSet<>(); + Set removalReqs = new HashSet<>(); List removalReasons = new ArrayList(); NetView oldView = currentView; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/972b6056/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/LeaveRequestMessage.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/LeaveRequestMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/LeaveRequestMessage.java index 4cadf19..df91a64 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/LeaveRequestMessage.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/LeaveRequestMessage.java @@ -3,6 +3,7 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.messages; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.Collection; import com.gemstone.gemfire.DataSerializer; import com.gemstone.gemfire.distributed.internal.DistributionManager; @@ -14,6 +15,13 @@ public class LeaveRequestMessage extends HighPriorityDistributionMessage { private InternalDistributedMember memberID; private String reason; + public LeaveRequestMessage(Collection coords, InternalDistributedMember id, String reason) { + super(); + setRecipients(coords); + this.memberID = id; + this.reason = reason; + } + public LeaveRequestMessage(InternalDistributedMember coord, InternalDistributedMember id, String reason) { super(); setRecipient(coord); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/972b6056/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java index 418b82b..200fd85 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java @@ -1880,7 +1880,7 @@ public class GMSMembershipManager implements MembershipManager, Manager } public void suspectMember(DistributedMember mbr, String reason) { - if (mbr != null) { + if (mbr != null && !this.shutdownMembers.containsKey(mbr)) { this.services.getHealthMonitor().suspect((InternalDistributedMember)mbr, reason); } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/972b6056/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java index 13b1651..21286fc 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java @@ -1869,6 +1869,7 @@ public class Connection implements Runnable { } } } + initiateSuspicionIfShared(); this.readerShuttingDown = true; try { requestClose(LocalizedStrings.Connection_IOEXCEPTION_IN_CHANNEL_READ_0.toLocalizedString(e)); @@ -1900,6 +1901,16 @@ public class Connection implements Runnable { } } + /** initiate suspect processing if a shared/ordered connection is lost and we're not shutting down */ + private void initiateSuspicionIfShared() { + if (this.isReceiver && this.handshakeRead && this.preserveOrder && this.sharedResource) { + if (this.owner.getConduit().getCancelCriterion().cancelInProgress() == null) { + this.owner.getDM().getMembershipManager().suspectMember(this.getRemoteAddress(), + "member shut down shared/ordered connection"); + } + } + } + /** * checks to see if an exception should not be logged: i.e., "forcibly closed", * "reset by peer", or "connection reset" @@ -2397,6 +2408,7 @@ public class Connection implements Runnable { logger.debug("{} io exception for {}", p2pReaderName(), this, io); } } + initiateSuspicionIfShared(); this.readerShuttingDown = true; try { requestClose(LocalizedStrings.Connection_IOEXCEPTION_RECEIVED_0.toLocalizedString(io)); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/972b6056/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/MembershipJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/MembershipJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/MembershipJUnitTest.java index 92cc6ad..7f43557 100755 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/MembershipJUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/MembershipJUnitTest.java @@ -14,13 +14,14 @@ import java.io.File; import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Properties; +import java.util.Set; import junit.framework.TestCase; import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.core.config.ConfigurationFactory; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -39,7 +40,6 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.membership.GMSJo import com.gemstone.gemfire.internal.AvailablePortHelper; import com.gemstone.gemfire.internal.SocketCreator; import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig; -import com.gemstone.gemfire.internal.logging.LogService; import com.gemstone.gemfire.test.junit.categories.IntegrationTest; @Category(IntegrationTest.class) @@ -96,7 +96,7 @@ public class MembershipJUnitTest extends TestCase { for (i=0; i empty = Collections.emptyList(); + Set empty = Collections.emptySet(); NetView lastView = new NetView(members[0], 4, vmbrs, empty, empty); InternalDistributedMember leader = members[2]; assertTrue(!leader.getNetMember().preferredForCoordinator()); @@ -108,7 +108,7 @@ public class MembershipJUnitTest extends TestCase { // have the joining member and another cache process (weight 10) in the failed members // collection and check to make sure that the joining member is not included in failed // weight calcs. - List failedMembers = new ArrayList(3); + Set failedMembers = new HashSet<>(3); failedMembers.add(joiningMember); failedMembers.add(members[members.length-1]); // cache failedMembers.add(members[members.length-2]); // admin http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/972b6056/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSHealthMonitorJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSHealthMonitorJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSHealthMonitorJUnitTest.java index 079bbb7..8910d77 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSHealthMonitorJUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSHealthMonitorJUnitTest.java @@ -15,6 +15,7 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import com.gemstone.gemfire.distributed.internal.DistributionConfig; @@ -98,7 +99,7 @@ public class GMSHealthMonitorJUnitTest { @Test public void testHMNextNeighbour() throws IOException { - NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new ArrayList(), new ArrayList()); + NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet(), new HashSet()); MethodExecuted messageSent = new MethodExecuted(); when(services.getJoinLeave().getMemberID()).thenAnswer(messageSent); @@ -114,7 +115,7 @@ public class GMSHealthMonitorJUnitTest { @Test public void testHMNextNeighbourVerify() throws IOException { - NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new ArrayList(), new ArrayList()); + NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet(), new HashSet()); when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3)); @@ -131,7 +132,7 @@ public class GMSHealthMonitorJUnitTest { @Test public void testHMNextNeighbourAfterTimeout() throws IOException { - NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new ArrayList(), new ArrayList()); + NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet(), new HashSet()); // 3rd is current member when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3)); @@ -154,7 +155,7 @@ public class GMSHealthMonitorJUnitTest { @Test public void testHMNextNeighbourBeforeTimeout() throws IOException { - NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new ArrayList(), new ArrayList()); + NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet(), new HashSet()); // 3rd is current member when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3)); @@ -175,7 +176,7 @@ public class GMSHealthMonitorJUnitTest { */ @Test public void testSuspectMembersCalledThroughMemberCheckThread() { - NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new ArrayList(), new ArrayList()); + NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet(), new HashSet()); MethodExecuted messageSent = new MethodExecuted(); // 3rd is current member @@ -201,7 +202,7 @@ public class GMSHealthMonitorJUnitTest { @Test public void testSuspectMembersNotCalledThroughPingThreadBeforeTimeout() { - NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new ArrayList(), new ArrayList()); + NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet(), new HashSet()); MethodExecuted messageSent = new MethodExecuted(); // 3rd is current member @@ -226,7 +227,7 @@ public class GMSHealthMonitorJUnitTest { */ @Test public void testSuspectMembersCalledThroughSuspectThread() throws Exception { - NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new ArrayList(), new ArrayList()); + NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet(), new HashSet()); MethodExecuted messageSent = new MethodExecuted(); // 3rd is current member @@ -249,7 +250,7 @@ public class GMSHealthMonitorJUnitTest { @Test public void testSuspectMembersNotCalledThroughSuspectThreadBeforeTimeout() { - NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new ArrayList(), new ArrayList()); + NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet(), new HashSet()); MethodExecuted messageSent = new MethodExecuted(); // 3rd is current member @@ -276,7 +277,7 @@ public class GMSHealthMonitorJUnitTest { @Test public void testRemoveMemberCalled() { - NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new ArrayList(), new ArrayList()); + NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet(), new HashSet()); MethodExecuted messageSent = new MethodExecuted(); // 3rd is current member @@ -311,7 +312,7 @@ public class GMSHealthMonitorJUnitTest { @Test public void testRemoveMemberNotCalledBeforeTimeout() { - NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new ArrayList(), new ArrayList()); + NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet(), new HashSet()); MethodExecuted messageSent = new MethodExecuted(); // 3rd is current member @@ -347,7 +348,7 @@ public class GMSHealthMonitorJUnitTest { @Test public void testRemoveMemberCalledAfterDoingFinalCheckOnCoordinator() { - NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new ArrayList(), new ArrayList()); + NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet(), new HashSet()); MethodExecuted messageSent = new MethodExecuted(); // preferred coordinators are 0 and 1 @@ -383,7 +384,7 @@ public class GMSHealthMonitorJUnitTest { @Test public void testCheckIfAvailable() { - NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new ArrayList(), new ArrayList()); + NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet(), new HashSet()); // 3rd is current member when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3)); @@ -403,7 +404,7 @@ public class GMSHealthMonitorJUnitTest { @Test public void testShutdown() { - NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new ArrayList(), new ArrayList()); + NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet(), new HashSet()); MethodExecuted messageSent = new MethodExecuted(); // 3rd is current member http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/972b6056/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java index bb5abe7..fac9ae8 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java @@ -5,19 +5,17 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import java.io.IOException; import java.net.UnknownHostException; -import java.util.ArrayList; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Set; import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -135,9 +133,9 @@ public class GMSJoinLeaveJUnitTest { private void prepareView() throws IOException { int viewId = 1; - List mbrs = new LinkedList(); - List shutdowns = new LinkedList(); - List crashes = new LinkedList(); + List mbrs = new LinkedList<>(); + Set shutdowns = new HashSet<>(); + Set crashes = new HashSet<>(); mbrs.add(mockMembers[0]); when(services.getMessenger()).thenReturn(messenger); @@ -151,9 +149,9 @@ public class GMSJoinLeaveJUnitTest { private void prepareAndInstallView() throws IOException { int viewId = 1; - List mbrs = new LinkedList(); - List shutdowns = new LinkedList(); - List crashes = new LinkedList(); + List mbrs = new LinkedList<>(); + Set shutdowns = new HashSet<>(); + Set crashes = new HashSet<>(); mbrs.add(mockMembers[0]); when(services.getMessenger()).thenReturn(messenger); @@ -187,9 +185,9 @@ public class GMSJoinLeaveJUnitTest { initMocks(); prepareAndInstallView(); - List mbrs = new LinkedList(); - List shutdowns = new LinkedList(); - List crashes = new LinkedList(); + List mbrs = new LinkedList<>(); + Set shutdowns = new HashSet<>(); + Set crashes = new HashSet<>(); mbrs.add(mockMembers[0]); mbrs.add(mockMembers[1]); @@ -208,9 +206,9 @@ public class GMSJoinLeaveJUnitTest { prepareAndInstallView(); int viewId = 2; - List mbrs = new LinkedList(); - List shutdowns = new LinkedList(); - List crashes = new LinkedList(); + List mbrs = new LinkedList<>(); + Set shutdowns = new HashSet<>(); + Set crashes = new HashSet<>(); mbrs.add(mockMembers[1]); mbrs.add(mockMembers[2]); mbrs.add(mockMembers[3]); @@ -452,9 +450,9 @@ public class GMSJoinLeaveJUnitTest { // set up a view with sufficient members, then create a new view // where enough weight is lost to cause a network partition - List mbrs = new LinkedList(); - List shutdowns = new LinkedList(); - List crashes = new LinkedList(); + List mbrs = new LinkedList<>(); + Set shutdowns = new HashSet<>(); + Set crashes = new HashSet<>(); mbrs.add(mockMembers[0]); mbrs.add(mockMembers[1]); mbrs.add(mockMembers[2]); @@ -466,7 +464,7 @@ public class GMSJoinLeaveJUnitTest { InstallViewMessage installViewMessage = new InstallViewMessage(newView, credentials, false); gmsJoinLeave.processMessage(installViewMessage); - crashes = new LinkedList<>(crashes); + crashes = new HashSet<>(crashes); crashes.add(mockMembers[1]); crashes.add(mockMembers[2]); mbrs = new LinkedList<>(mbrs); @@ -488,9 +486,9 @@ public class GMSJoinLeaveJUnitTest { // set up a view with sufficient members, then create a new view // where enough weight is lost to cause a network partition - List mbrs = new LinkedList(); - List shutdowns = new LinkedList(); - List crashes = new LinkedList(); + List mbrs = new LinkedList<>(); + Set shutdowns = new HashSet<>(); + Set crashes = new HashSet<>(); mbrs.add(mockMembers[0]); mbrs.add(mockMembers[1]); mbrs.add(mockMembers[2]); @@ -502,7 +500,7 @@ public class GMSJoinLeaveJUnitTest { InstallViewMessage installViewMessage = new InstallViewMessage(newView, credentials, false); gmsJoinLeave.processMessage(installViewMessage); - crashes = new LinkedList<>(crashes); + crashes = new HashSet<>(crashes); crashes.add(mockMembers[1]); crashes.add(mockMembers[2]); mbrs = new LinkedList<>(mbrs);