Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9B7A918FD2 for ; Mon, 22 Feb 2016 18:36:22 +0000 (UTC) Received: (qmail 76110 invoked by uid 500); 22 Feb 2016 18:36:16 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 76075 invoked by uid 500); 22 Feb 2016 18:36:16 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 76065 invoked by uid 99); 22 Feb 2016 18:36:16 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Feb 2016 18:36:16 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id C7C3FC02C5 for ; Mon, 22 Feb 2016 18:36:15 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.021 X-Spam-Level: X-Spam-Status: No, score=-4.021 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id tm3Yyml9Dpo8 for ; Mon, 22 Feb 2016 18:35:59 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with SMTP id AC0D360CDA for ; Mon, 22 Feb 2016 18:35:26 +0000 (UTC) Received: (qmail 67869 invoked by uid 99); 22 Feb 2016 18:35:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Feb 2016 18:35:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 18D21E0571; Mon, 22 Feb 2016 18:35:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: upthewaterspout@apache.org To: commits@geode.incubator.apache.org Date: Mon, 22 Feb 2016 18:35:59 -0000 Message-Id: In-Reply-To: <0806417cf36345658593943fc13e8a5d@git.apache.org> References: <0806417cf36345658593943fc13e8a5d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [37/83] [abbrv] [partial] incubator-geode git commit: Merge remote-tracking branch 'origin/develop' into feature/GEODE-917 http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java ---------------------------------------------------------------------- diff --cc geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java index d75b28d,0000000..edfee10 mode 100755,000000..100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java @@@ -1,2652 -1,0 +1,2655 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.distributed.internal.membership.gms.mgr; + +import java.io.IOException; +import java.io.NotSerializableException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.CancelException; +import com.gemstone.gemfire.ForcedDisconnectException; +import com.gemstone.gemfire.GemFireConfigException; +import com.gemstone.gemfire.InternalGemFireError; +import com.gemstone.gemfire.SystemConnectException; +import com.gemstone.gemfire.SystemFailure; +import com.gemstone.gemfire.ToDataException; +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.server.CacheServer; +import com.gemstone.gemfire.distributed.DistributedMember; +import com.gemstone.gemfire.distributed.DistributedSystem; +import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException; +import com.gemstone.gemfire.distributed.Locator; +import com.gemstone.gemfire.distributed.internal.AdminMessageType; +import com.gemstone.gemfire.distributed.internal.DMStats; +import com.gemstone.gemfire.distributed.internal.DSClock; +import com.gemstone.gemfire.distributed.internal.DistributionConfig; +import com.gemstone.gemfire.distributed.internal.DistributionException; +import com.gemstone.gemfire.distributed.internal.DistributionManager; +import com.gemstone.gemfire.distributed.internal.DistributionMessage; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.distributed.internal.InternalLocator; +import com.gemstone.gemfire.distributed.internal.OverflowQueueWithDMStats; +import com.gemstone.gemfire.distributed.internal.SizeableRunnable; +import com.gemstone.gemfire.distributed.internal.StartupMessage; +import com.gemstone.gemfire.distributed.internal.direct.DirectChannel; +import com.gemstone.gemfire.distributed.internal.direct.DirectChannelListener; +import com.gemstone.gemfire.distributed.internal.direct.ShunnedMemberException; +import com.gemstone.gemfire.distributed.internal.membership.DistributedMembershipListener; +import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; +import com.gemstone.gemfire.distributed.internal.membership.MembershipManager; +import com.gemstone.gemfire.distributed.internal.membership.MembershipTestHook; +import com.gemstone.gemfire.distributed.internal.membership.NetView; +import com.gemstone.gemfire.distributed.internal.membership.QuorumChecker; +import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMember; +import com.gemstone.gemfire.distributed.internal.membership.gms.Services; +import com.gemstone.gemfire.distributed.internal.membership.gms.SuspectMember; +import com.gemstone.gemfire.distributed.internal.membership.gms.fd.GMSHealthMonitor; +import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Manager; +import com.gemstone.gemfire.distributed.internal.membership.gms.membership.GMSJoinLeave; +import com.gemstone.gemfire.distributed.internal.membership.gms.messenger.GMSQuorumChecker; +import com.gemstone.gemfire.internal.Assert; +import com.gemstone.gemfire.internal.SystemTimer; +import com.gemstone.gemfire.internal.Version; +import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig; ++import com.gemstone.gemfire.internal.cache.CacheServerImpl; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.xmlcache.CacheServerCreation; +import com.gemstone.gemfire.internal.cache.xmlcache.CacheXmlGenerator; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.logging.log4j.AlertAppender; +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; +import com.gemstone.gemfire.internal.logging.log4j.LogMarker; +import com.gemstone.gemfire.internal.shared.StringPrintWriter; +import com.gemstone.gemfire.internal.tcp.ConnectExceptions; +import com.gemstone.gemfire.internal.tcp.MemberShunnedException; +import com.gemstone.gemfire.internal.util.Breadcrumbs; + +public class GMSMembershipManager implements MembershipManager, Manager +{ + private static final Logger logger = Services.getLogger(); + + /** product version to use for multicast serialization */ + volatile boolean disableMulticastForRollingUpgrade; + + /** + * set to true if the distributed system that created this manager was + * auto-reconnecting when it was created. + */ + boolean wasReconnectingSystem; + + /** + * A quorum checker is created during reconnect and is held + * here so it is available to the UDP protocol for passing off + * the ping-pong responses used in the quorum-checking algorithm. + */ + private volatile QuorumChecker quorumChecker; + + /** + * thread-local used to force use of Messenger for communications, usually to + * avoid deadlock when conserve-sockets=true. Use of this should be removed + * when connection pools are implemented in the direct-channel + */ + private ThreadLocal forceUseUDPMessaging = new ThreadLocal(); + + /** + * Trick class to make the startup synch more + * visible in stack traces + * + * @see GMSMembershipManager#startupLock + */ + static class EventProcessingLock { + public EventProcessingLock() { + } + } + + static class StartupEvent { + static final int SURPRISE_CONNECT = 1; + static final int VIEW = 2; + static final int MESSAGE = 3; + + /** + * indicates whether the event is a departure, a surprise connect + * (i.e., before the view message arrived), a view, or a regular + * message + * + * @see #SURPRISE_CONNECT + * @see #VIEW + * @see #MESSAGE + */ + private int kind; + + // Miscellaneous state depending on the kind of event + InternalDistributedMember member; + boolean crashed; + String reason; + DistributionMessage dmsg; + NetView gmsView; + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("kind="); + switch (kind) { + case SURPRISE_CONNECT: + sb.append("connect; member = <" + member + ">"); + break; + case VIEW: + String text = gmsView.toString(); + sb.append("view <" + text + ">"); + break; + case MESSAGE: + sb.append("message <" + dmsg + ">"); + break; + default: + sb.append("unknown=<" + kind + ">"); + break; + } + return sb.toString(); + } + + /** + * Create a surprise connect event + * @param member the member connecting + */ + StartupEvent(final InternalDistributedMember member) { + this.kind = SURPRISE_CONNECT; + this.member = member; + } + /** + * Indicate if this is a surprise connect event + * @return true if this is a connect event + */ + boolean isSurpriseConnect() { + return this.kind == SURPRISE_CONNECT; + } + + /** + * Create a view event + * @param v the new view + */ + StartupEvent(NetView v) { + this.kind = VIEW; + this.gmsView = v; + } + + /** + * Indicate if this is a view event + * @return true if this is a view event + */ + boolean isGmsView() { + return this.kind == VIEW; + } + + /** + * Create a message event + * @param d the message + */ + StartupEvent(DistributionMessage d) { + this.kind = MESSAGE; + this.dmsg = d; + } + /** + * Indicate if this is a message event + * @return true if this is a message event + */ + boolean isDistributionMessage() { + return this.kind == MESSAGE; + } + } + + private int membershipCheckTimeout = DistributionConfig.DEFAULT_SECURITY_PEER_VERIFYMEMBER_TIMEOUT; + + /** + * This object synchronizes threads waiting for + * startup to finish. Updates to {@link #startupMessages} + * are synchronized through this object. + */ + protected final EventProcessingLock startupLock = new EventProcessingLock(); + + /** + * This is the latest view (ordered list of DistributedMembers) + * that has been installed + * + * All accesses to this object are protected via {@link #latestViewLock} + */ + protected NetView latestView = new NetView(); + + /** + * This is the lock for protecting access to latestView + * + * @see #latestView + */ + protected ReadWriteLock latestViewLock = new ReentrantReadWriteLock(); + + /** + * This is the listener that accepts our membership events + */ + protected com.gemstone.gemfire.distributed.internal.membership.DistributedMembershipListener listener; + + /** + * Membership failure listeners - for testing + */ + List membershipTestHooks; + + /** + * This is a representation of the local member (ourself) + */ + protected InternalDistributedMember address = null; // new DistributedMember(-1); + + protected DirectChannel directChannel; + + protected MyDCReceiver dcReceiver; + + volatile boolean isJoining; + + /** have we joined successfully? */ + volatile boolean hasJoined; + + /** + * Members of the distributed system that we believe have shut down. + * Keys are instances of {@link InternalDistributedMember}, values are + * Longs indicating the time this member was shunned. + * + * Members are removed after {@link #SHUNNED_SUNSET} seconds have + * passed. + * + * Accesses to this list needs to be under the read or write lock of {@link #latestViewLock} + * + * @see System#currentTimeMillis() + */ +// protected final Set shunnedMembers = Collections.synchronizedSet(new HashSet()); + protected final Map shunnedMembers = new ConcurrentHashMap(); + + /** + * Members that have sent a shutdown message. This is used to suppress + * suspect processing that otherwise becomes pretty aggressive + * when a member is shutting down. + */ + private final Map shutdownMembers = new BoundedLinkedHashMap(1000); + + /** + * per bug 39552, keep a list of members that have been shunned and for + * which a message is printed. Contents of this list are cleared at the + * same time they are removed from {@link #shunnedMembers}. + * + * Accesses to this list needs to be under the read or write lock of {@link #latestViewLock} + */ + protected final HashSet shunnedAndWarnedMembers = new HashSet(); + /** + * The identities and birth-times of others that we have allowed into + * membership at the distributed system level, but have not yet appeared + * in a view. + *

+ * Keys are instances of {@link InternalDistributedMember}, values are + * Longs indicating the time this member was shunned. + *

+ * Members are removed when a view containing them is processed. If, + * after {@link #surpriseMemberTimeout} milliseconds have passed, a view + * containing the member has not arrived, the member is removed from + * membership and member-left notification is performed. + *

> + * Accesses to this list needs to be under the read or write lock of {@link #latestViewLock} + * + * @see System#currentTimeMillis() + */ + protected final Map surpriseMembers = new ConcurrentHashMap(); + + /** + * the timeout interval for surprise members. This is calculated from + * the member-timeout setting + */ + protected int surpriseMemberTimeout; + + /** + * javagroups can skip views and omit telling us about a crashed member. + * This map holds a history of suspected members that we use to detect + * crashes. + */ + private final Map suspectedMembers = new ConcurrentHashMap(); + + /** + * the timeout interval for suspected members + */ + private final long suspectMemberTimeout = 180000; + + /** + * Length of time, in seconds, that a member is retained in the zombie set + * + * @see #shunnedMembers + */ + static private final int SHUNNED_SUNSET = Integer.getInteger( + "gemfire.shunned-member-timeout", 300).intValue(); + + /** + * Set to true when the service should stop. + */ + protected volatile boolean shutdownInProgress = false; + + /** + * Set to true when upcalls should be generated for + * events. + */ + protected volatile boolean processingEvents = false; + + /** + * This is the latest viewId installed + */ + long latestViewId = -1; + + /** distribution manager statistics */ + DMStats stats; + + /** + A list of messages received during channel startup that couldn't be processed yet. + Additions or removals of this list must be synchronized + via {@link #startupLock}. + @since 5.0 + */ + protected LinkedList startupMessages = new LinkedList(); + + /** + * ARB: the map of latches is used to block peer handshakes till + * authentication is confirmed. + */ + final private HashMap memberLatch = new HashMap(); + + /** + * Insert our own MessageReceiver between us and the direct channel, in order + * to correctly filter membership events. + * + * @author jpenney + * + */ + class MyDCReceiver implements DirectChannelListener + { + + DirectChannelListener upCall; + + /** + * Don't provide events until the caller has told us we are ready. + * + * Synchronization provided via GroupMembershipService.class. + * + * Note that in practice we only need to delay accepting the first + * client; we don't need to put this check before every call... + * + */ + MyDCReceiver(DirectChannelListener up) { + upCall = up; + } + + public void messageReceived(DistributionMessage msg) { + // bug 36851 - notify failure detection that we've had contact from a member + services.getHealthMonitor().contactedBy(msg.getSender()); + handleOrDeferMessage(msg); + } + + public DistributionManager getDM() { + return upCall.getDM(); + } + + } + + + /** if we connect to a locator that has NPD enabled then we enable it in this VM */ + public void enableNetworkPartitionDetection() { + if (logger.isDebugEnabled()) { + logger.debug("Network partition detection is being enabled"); + } + this.services.getConfig().getDistributionConfig().setEnableNetworkPartitionDetection(true); + this.services.getConfig().setNetworkPartitionDetectionEnabled(true); + } + + /** + * Analyze a given view object, generate events as appropriate + * + * @param newView + */ + protected void processView(long newViewId, NetView newView) + { + // Sanity check... + if (logger.isDebugEnabled()) { + StringBuffer msg = new StringBuffer(200); + msg.append("Membership: Processing view "); + msg.append(newView); + msg.append("} on " + address.toString()); + if (!newView.contains(address)) { + logger.info(LocalizedMessage.create( + LocalizedStrings.GroupMembershipService_THE_MEMBER_WITH_ID_0_IS_NO_LONGER_IN_MY_OWN_VIEW_1, + new Object[] {address, newView})); + } + } + +// if (newView.getCrashedMembers().size() > 0) { +// // dump stack for debugging #39827 +// OSProcess.printStacks(0); +// } + // We perform the update under a global lock so that other + // incoming events will not be lost in terms of our global view. + latestViewLock.writeLock().lock(); + try { + // first determine the version for multicast message serialization + Version version = Version.CURRENT; + for (Iterator> it=surpriseMembers.entrySet().iterator(); it.hasNext(); ) { + InternalDistributedMember mbr = it.next().getKey(); + Version itsVersion = mbr.getVersionObject(); + if (itsVersion != null && version.compareTo(itsVersion) < 0) { + version = itsVersion; + } + } + for (InternalDistributedMember mbr: newView.getMembers()) { + Version itsVersion = mbr.getVersionObject(); + if (itsVersion != null && itsVersion.compareTo(version) < 0) { + version = mbr.getVersionObject(); + } + } + disableMulticastForRollingUpgrade = !version.equals(Version.CURRENT); + + if (newViewId < latestViewId) { + // ignore this view since it is old news + return; + } + + // Save previous view, for delta analysis + NetView priorView = latestView; + + // update the view to reflect our changes, so that + // callbacks will see the new (updated) view. + latestViewId = newViewId; + latestView = new NetView(newView, newView.getViewId()); + + // look for additions + for (int i = 0; i < newView.getMembers().size(); i++) { // additions + InternalDistributedMember m = (InternalDistributedMember)newView.getMembers().get(i); + + // Once a member has been seen via a view, remove them from the + // newborn set + boolean wasSurprise = surpriseMembers.remove(m) != null; + + // bug #45155 - membership view processing was slow, causing a member to connect as "surprise" + // and the surprise timeout removed the member and shunned it, keeping it from being + // recognized as a valid member when it was finally seen in a view +// if (isShunned(m)) { +// warnShuns.add(m); +// continue; +// } + + // if it's in a view, it's no longer suspect + suspectedMembers.remove(m); + + if (priorView.contains(m) || wasSurprise) { + continue; // already seen + } + + // ARB: unblock any waiters for this particular member. + // i.e. signal any waiting threads in tcpconduit. + String authInit = this.services.getConfig().getDistributionConfig().getSecurityPeerAuthInit(); + boolean isSecure = authInit != null && authInit.length() != 0; + + if (isSecure) { + CountDownLatch currentLatch; + if ((currentLatch = (CountDownLatch)memberLatch.get(m)) != null) { + currentLatch.countDown(); + } + } + + if (shutdownInProgress()) { + addShunnedMember(m); + continue; // no additions processed after shutdown begins + } else { + boolean wasShunned = endShun(m); // bug #45158 - no longer shun a process that is now in view + if (wasShunned && logger.isDebugEnabled()) { + logger.debug("No longer shunning {} as it is in the current membership view", m); + } + } + + logger.info(LocalizedMessage.create(LocalizedStrings.GroupMembershipService_MEMBERSHIP_PROCESSING_ADDITION__0_, m)); + + try { + listener.newMemberConnected(m); + } + catch (VirtualMachineError err) { + SystemFailure.initiateFailure(err); + // If this ever returns, rethrow the error. We're poisoned + // now, so don't let this thread continue. + throw err; + } + catch (DistributedSystemDisconnectedException e) { + // don't log shutdown exceptions + } + catch (Throwable t) { + // Whenever you catch Error or Throwable, you must also + // catch VirtualMachineError (see above). However, there is + // _still_ a possibility that you are dealing with a cascading + // error condition, so you also need to check to see if the JVM + // is still usable: + SystemFailure.checkFailure(); + logger.info(LocalizedMessage.create( + LocalizedStrings.GroupMembershipService_MEMBERSHIP_FAULT_WHILE_PROCESSING_VIEW_ADDITION_OF__0, m), t); + } + } // additions + + // look for departures + for (int i = 0; i < priorView.getMembers().size(); i++) { // departures + InternalDistributedMember m = (InternalDistributedMember)priorView.getMembers().get(i); + if (newView.contains(m)) { + continue; // still alive + } + + if (surpriseMembers.containsKey(m)) { + continue; // member has not yet appeared in a view + } + + try { + removeWithViewLock(m, + newView.getCrashedMembers().contains(m) || suspectedMembers.containsKey(m) + , "departed membership view"); + } + catch (VirtualMachineError err) { + SystemFailure.initiateFailure(err); + // If this ever returns, rethrow the error. We're poisoned + // now, so don't let this thread continue. + throw err; + } + catch (Throwable t) { + // Whenever you catch Error or Throwable, you must also + // catch VirtualMachineError (see above). However, there is + // _still_ a possibility that you are dealing with a cascading + // error condition, so you also need to check to see if the JVM + // is still usable: + SystemFailure.checkFailure(); + logger.info(LocalizedMessage.create( + LocalizedStrings.GroupMembershipService_MEMBERSHIP_FAULT_WHILE_PROCESSING_VIEW_REMOVAL_OF__0, m), t); + } + } // departures + + // expire surprise members, add others to view + long oldestAllowed = System.currentTimeMillis() - this.surpriseMemberTimeout; + for (Iterator> it=surpriseMembers.entrySet().iterator(); it.hasNext(); ) { + Map.Entry entry = it.next(); + Long birthtime = (Long)entry.getValue(); + if (birthtime.longValue() < oldestAllowed) { + it.remove(); + InternalDistributedMember m = entry.getKey(); + logger.info(LocalizedMessage.create( + LocalizedStrings.GroupMembershipService_MEMBERSHIP_EXPIRING_MEMBERSHIP_OF_SURPRISE_MEMBER_0, m)); + removeWithViewLock(m, true, "not seen in membership view in " + + this.surpriseMemberTimeout + "ms"); + } + else { + if (!latestView.contains(entry.getKey())) { + latestView.add(entry.getKey()); + } + } + } + // expire suspected members + oldestAllowed = System.currentTimeMillis() - this.suspectMemberTimeout; + for (Iterator it=suspectedMembers.entrySet().iterator(); it.hasNext(); ) { + Map.Entry entry = (Map.Entry)it.next(); + Long birthtime = (Long)entry.getValue(); + if (birthtime.longValue() < oldestAllowed) { + InternalDistributedMember m = (InternalDistributedMember)entry.getKey(); + it.remove(); + } + } + try { + listener.viewInstalled(latestView); + startCleanupTimer(); + } + catch (DistributedSystemDisconnectedException se) { + } + } finally { + latestViewLock.writeLock().unlock(); + } + } + + /** + * the timer used to perform periodic tasks + * + * Concurrency: protected by {@link #latestViewLock} ReentrantReadWriteLock + */ + private SystemTimer cleanupTimer; + + private Services services; + + private boolean mcastEnabled; + + private boolean tcpDisabled; + + + @Override + public boolean isMulticastAllowed() { + return !disableMulticastForRollingUpgrade; + } + + /** + * Joins the distributed system + * + * @throws GemFireConfigException - configuration error + * @throws SystemConnectException - problem joining + */ + private void join() { + services.setShutdownCause(null); + services.getCancelCriterion().cancel(null); + + latestViewLock.writeLock().lock(); + try { + try { + this.isJoining = true; // added for bug #44373 + + // connect + long start = System.currentTimeMillis(); + + boolean ok = services.getJoinLeave().join(); + + if (!ok) { + throw new GemFireConfigException("Unable to join the distributed system. " + + "Operation either timed out, was stopped or Locator does not exist."); + } + + long delta = System.currentTimeMillis() - start; + + logger.info(LogMarker.DISTRIBUTION, LocalizedMessage.create( + LocalizedStrings.GroupMembershipService_JOINED_TOOK__0__MS, delta)); + + NetView initialView = services.getJoinLeave().getView(); + latestView = new NetView(initialView, initialView.getViewId()); + listener.viewInstalled(latestView); + + } catch (RuntimeException ex) { + throw ex; + } + catch (Exception ex) { + if (ex.getCause() != null && ex.getCause().getCause() instanceof SystemConnectException) { + throw (SystemConnectException)(ex.getCause().getCause()); + } + throw new DistributionException(LocalizedStrings.GroupMembershipService_AN_EXCEPTION_WAS_THROWN_WHILE_JOINING.toLocalizedString(), ex); + } + finally { + this.isJoining = false; + } + } + finally { + latestViewLock.writeLock().unlock(); + } + } + + + public GMSMembershipManager(DistributedMembershipListener listener) { + Assert.assertTrue(listener != null); + this.listener = listener; + } + + @Override + public void init(Services services) { + this.services = services; + + Assert.assertTrue(services != null); + + this.stats = services.getStatistics(); + DistributionConfig config = services.getConfig().getDistributionConfig(); + RemoteTransportConfig transport = services.getConfig().getTransport(); + + this.membershipCheckTimeout = config.getSecurityPeerMembershipTimeout(); + this.wasReconnectingSystem = transport.getIsReconnectingDS(); + + // cache these settings for use in send() + this.mcastEnabled = transport.isMcastEnabled(); + this.tcpDisabled = transport.isTcpDisabled(); + + if (!this.tcpDisabled) { + dcReceiver = new MyDCReceiver(listener); + } + + surpriseMemberTimeout = Math.max(20 * DistributionConfig.DEFAULT_MEMBER_TIMEOUT, + 20 * config.getMemberTimeout()); + surpriseMemberTimeout = Integer.getInteger("gemfire.surprise-member-timeout", surpriseMemberTimeout).intValue(); + + } + + @Override + public void start() { + DistributionConfig config = services.getConfig().getDistributionConfig(); + RemoteTransportConfig transport = services.getConfig().getTransport(); + + int dcPort = 0; + if (!tcpDisabled) { + directChannel = new DirectChannel(this, dcReceiver, config); + dcPort = directChannel.getPort(); + } + + + services.getMessenger().getMemberID().setDirectChannelPort(dcPort); + + } + + + @Override + public void joinDistributedSystem() { + long startTime = System.currentTimeMillis(); + + try { + join(); + } + catch (RuntimeException e) { + if (directChannel != null) { + directChannel.disconnect(e); + } + throw e; + } + + this.address = services.getMessenger().getMemberID(); + + int dcPort = 0; + if (directChannel != null) { + dcPort = directChannel.getPort(); + } + + if (directChannel != null) { + directChannel.setLocalAddr(address); + } + + this.hasJoined = true; + + // in order to debug startup issues we need to announce the membership + // ID as soon as we know it + logger.info(LocalizedMessage.create(LocalizedStrings.GroupMembershipService_entered_into_membership_in_group_0_with_id_1, + new Object[]{""+(System.currentTimeMillis()-startTime)})); + + } + + @Override + public void started() { + } + + + /** this is invoked by JoinLeave when there is a loss of quorum in the membership system */ + public void quorumLost(Collection failures, NetView view) { + // notify of quorum loss if split-brain detection is enabled (meaning we'll shut down) or + // if the loss is more than one member + + boolean notify = failures.size() > 1; + if (!notify) { + notify = services.getConfig().isNetworkPartitionDetectionEnabled(); + } + + if (notify) { + List remaining = new ArrayList(view.getMembers()); + remaining.removeAll(failures); + + if (inhibitForceDisconnectLogging) { + if (logger.isDebugEnabled()) { + logger.debug("Possible loss of quorum"); + } + } + logger.fatal(LocalizedMessage.create( + LocalizedStrings.GroupMembershipService_POSSIBLE_LOSS_OF_QUORUM_DETECTED, new Object[] {failures.size(), failures})); + if (inhibitForceDisconnectLogging) { + if (logger.isDebugEnabled()) { + logger.debug("Possible loss of quorum"); + } + } + + + try { + this.listener.quorumLost(new HashSet(failures), + remaining); + } catch (CancelException e) { + // safe to ignore - a forced disconnect probably occurred + } + } + } + + + @Override + public boolean testMulticast() { + try { + return services.getMessenger().testMulticast(services.getConfig().getMemberTimeout()); + } catch (InterruptedException e) { + services.getCancelCriterion().checkCancelInProgress(e); + Thread.currentThread().interrupt(); + return false; + } + } + + /** + * Remove a member. {@link #latestViewLock} must be held + * before this method is called. If member is not already shunned, + * the uplevel event handler is invoked. + * + * @param dm + * @param crashed + * @param reason + */ + protected void removeWithViewLock(InternalDistributedMember dm, + boolean crashed, String reason) { + boolean wasShunned = isShunned(dm); + + // Delete resources + destroyMember(dm, crashed, reason); + + if (wasShunned) { + return; // Explicit deletion, no upcall. + } + + try { + listener.memberDeparted(dm, crashed, reason); + } + catch (DistributedSystemDisconnectedException se) { + // let's not get huffy about it + } + } + + /** + * Process a surprise connect event, or place it on the startup queue. + * @param member the member + */ + protected void handleOrDeferSurpriseConnect(InternalDistributedMember member) { + synchronized (startupLock) { + if (!processingEvents) { + startupMessages.add(new StartupEvent(member)); + return; + } + } + processSurpriseConnect(member); + } + + public void startupMessageFailed(DistributedMember mbr, String failureMessage) { + // fix for bug #40666 + addShunnedMember((InternalDistributedMember)mbr); + // fix for bug #41329, hang waiting for replies + try { + listener.memberDeparted((InternalDistributedMember)mbr, true, "failed to pass startup checks"); + } + catch (DistributedSystemDisconnectedException se) { + // let's not get huffy about it + } + } + + + /** + * Logic for handling a direct connection event (message received + * from a member not in the view). Does not employ the + * startup queue. + *

+ * Must be called with {@link #latestViewLock} held. Waits + * until there is a stable view. If the member has already + * been added, simply returns; else adds the member. + * + * @param dm the member joining + */ + public boolean addSurpriseMember(DistributedMember dm) { + final InternalDistributedMember member = (InternalDistributedMember)dm; + boolean warn = false; + + latestViewLock.writeLock().lock(); + try { + // At this point, the join may have been discovered by + // other means. + if (latestView.contains(member)) { + return true; + } + if (surpriseMembers.containsKey(member)) { + return true; + } + if (member.getVmViewId() < 0) { + logger.warn("adding a surprise member that has not yet joined the distributed system: " + member, new Exception("stack trace")); + } + if (latestView.getViewId() > member.getVmViewId()) { + // tell the process that it should shut down distribution. + // Run in a separate thread so we don't hold the view lock during the request. Bug #44995 + new Thread(Thread.currentThread().getThreadGroup(), + "Removing shunned GemFire node " + member) { + @Override + public void run() { + // fix for bug #42548 + // this is an old member that shouldn't be added + logger.warn(LocalizedMessage.create( + LocalizedStrings.GroupMembershipService_Invalid_Surprise_Member, new Object[]{member, latestView})); + requestMemberRemoval(member, "this member is no longer in the view but is initiating connections"); + } + }.start(); + addShunnedMember(member); + return false; + } + + // Adding him to this set ensures we won't remove him if a new + // view comes in and he's still not visible. + surpriseMembers.put(member, Long.valueOf(System.currentTimeMillis())); + + if (shutdownInProgress()) { + // Force disconnect, esp. the TCPConduit + String msg = LocalizedStrings.GroupMembershipService_THIS_DISTRIBUTED_SYSTEM_IS_SHUTTING_DOWN.toLocalizedString(); + if (directChannel != null) { + try { + directChannel.closeEndpoint(member, msg); + } catch (DistributedSystemDisconnectedException e) { + // ignore - happens during shutdown + } + } + destroyMember(member, false, msg); // for good luck + return true; // allow during shutdown + } + + if (isShunned(member)) { + warn = true; + surpriseMembers.remove(member); + } else { + + // Now that we're sure the member is new, add them. + // make sure the surprise-member cleanup task is running + if (this.cleanupTimer == null) { + startCleanupTimer(); + } // cleanupTimer == null + + // Ensure that the member is accounted for in the view + // Conjure up a new view including the new member. This is necessary + // because we are about to tell the listener about a new member, so + // the listener should rightfully expect that the member is in our + // membership view. + + // However, we put the new member at the end of the list. This + // should ensure he's not chosen as an elder. + // This will get corrected when he finally shows up in the + // view. + NetView newMembers = new NetView(latestView, latestView.getViewId()); + newMembers.add(member); + latestView = newMembers; + } + } finally { + latestViewLock.writeLock().unlock(); + } + if (warn) { // fix for bug #41538 - deadlock while alerting + logger.warn(LocalizedMessage.create( + LocalizedStrings.GroupMembershipService_MEMBERSHIP_IGNORING_SURPRISE_CONNECT_FROM_SHUNNED_MEMBER_0, member)); + } else { + listener.newMemberConnected(member); + } + return !warn; + } + + + /** starts periodic task to perform cleanup chores such as expire surprise members */ + private void startCleanupTimer() { + latestViewLock.writeLock().lock(); + try { + if (this.cleanupTimer != null) { + return; + } + DistributedSystem ds = InternalDistributedSystem.getAnyInstance(); + if (ds != null && ds.isConnected()) { + this.cleanupTimer = new SystemTimer(ds, true); + SystemTimer.SystemTimerTask st = new SystemTimer.SystemTimerTask() { + @Override + public void run2() { + latestViewLock.writeLock().lock(); + try { + long oldestAllowed = System.currentTimeMillis() - surpriseMemberTimeout; + for (Iterator it=surpriseMembers.entrySet().iterator(); it.hasNext(); ) { + Map.Entry entry = (Map.Entry)it.next(); + Long birthtime = (Long)entry.getValue(); + if (birthtime.longValue() < oldestAllowed) { + it.remove(); + InternalDistributedMember m = (InternalDistributedMember)entry.getKey(); + logger.info(LocalizedMessage.create( + LocalizedStrings.GroupMembershipService_MEMBERSHIP_EXPIRING_MEMBERSHIP_OF_SURPRISE_MEMBER_0, m)); + removeWithViewLock(m, true, "not seen in membership view in " + + surpriseMemberTimeout + "ms"); + } + } + } finally { + latestViewLock.writeLock().unlock(); + } + } + }; + this.cleanupTimer.scheduleAtFixedRate(st, surpriseMemberTimeout, surpriseMemberTimeout/3); + } // ds != null && ds.isConnected() + } finally { + latestViewLock.writeLock().unlock(); + } + } + /** + * Dispatch the distribution message, or place it on the startup queue. + * + * @param msg the message to process + */ + protected void handleOrDeferMessage(DistributionMessage msg) { + synchronized(startupLock) { + if (!processingEvents) { + startupMessages.add(new StartupEvent(msg)); + return; + } + } + dispatchMessage(msg); + } + + public void warnShun(DistributedMember m) { + latestViewLock.writeLock().lock(); + try { + if (!shunnedMembers.containsKey(m)) { + return; // not shunned + } + if (shunnedAndWarnedMembers.contains(m)) { + return; // already warned + } + shunnedAndWarnedMembers.add(m); + } finally { + latestViewLock.writeLock().unlock(); + } + // issue warning outside of sync since it may cause messaging and we don't + // want to hold the view lock while doing that + logger.warn(LocalizedMessage.create(LocalizedStrings.GroupMembershipService_MEMBERSHIP_DISREGARDING_SHUNNED_MEMBER_0, m)); + } + + @Override + public void processMessage(DistributionMessage msg) { + handleOrDeferMessage(msg); + } + + /** + * Logic for processing a distribution message. + *

+ * It is possible to receive messages not consistent with our view. + * We handle this here, and generate an uplevel event if necessary + * @param msg the message + */ + public void dispatchMessage(DistributionMessage msg) { + boolean isNew = false; + InternalDistributedMember m = msg.getSender(); + boolean shunned = false; + + // First grab the lock: check the sender against our stabilized view. + latestViewLock.writeLock().lock(); + try { + if (isShunned(m)) { + if (msg instanceof StartupMessage) { + endShun(m); + } + else { + // fix for bug 41538 - sick alert listener causes deadlock + // due to view lock being held during messaging + shunned = true; + } + } // isShunned + + if (!shunned) { + isNew = !latestView.contains(m) && !surpriseMembers.containsKey(m); + + // If it's a new sender, wait our turn, generate the event + if (isNew) { + shunned = !addSurpriseMember(m); + } // isNew + } + + // Latch the view before we unlock + } finally { + latestViewLock.writeLock().unlock(); + } + + if (shunned) { // bug #41538 - shun notification must be outside synchronization to avoid hanging + warnShun(m); + logger.info("Membership: Ignoring message from shunned member <{}>:{}", m, msg); + throw new MemberShunnedException(m); + } + + listener.messageReceived(msg); + } + + /** + * Process a new view object, or place on the startup queue + * @param viewArg the new view + */ + protected void handleOrDeferViewEvent(NetView viewArg) { + if (this.isJoining) { + // bug #44373 - queue all view messages while joining. + // This is done under the latestViewLock, but we can't block here because + // we're sitting in the UDP reader thread. + synchronized(startupLock) { + startupMessages.add(new StartupEvent(viewArg)); + return; + } + } + latestViewLock.writeLock().lock(); + try { + synchronized(startupLock) { + if (!processingEvents) { + startupMessages.add(new StartupEvent(viewArg)); + return; + } + } + // view processing can take a while, so we use a separate thread + // to avoid blocking a reader thread + NetView newView = viewArg; + long newId = viewArg.getViewId(); + LocalViewMessage v = new LocalViewMessage(address, newId, newView, + GMSMembershipManager.this); + + listener.messageReceived(v); + } finally { + latestViewLock.writeLock().unlock(); + } + } + + @Override + public void memberSuspected(InternalDistributedMember initiator, InternalDistributedMember suspect, String reason) { + SuspectMember s = new SuspectMember(initiator, suspect, reason); + handleOrDeferSuspect(s); + } + + /** + * Process a new view object, or place on the startup queue + * @param suspectInfo the suspectee and suspector + */ + protected void handleOrDeferSuspect(SuspectMember suspectInfo) { + latestViewLock.writeLock().lock(); + try { + synchronized(startupLock) { + if (!processingEvents) { + return; + } + } + InternalDistributedMember suspect = suspectInfo.suspectedMember; + InternalDistributedMember who = suspectInfo.whoSuspected; + this.suspectedMembers.put(suspect, Long.valueOf(System.currentTimeMillis())); + try { + listener.memberSuspect(suspect, who, suspectInfo.reason); + } + catch (DistributedSystemDisconnectedException se) { + // let's not get huffy about it + } + } finally { + latestViewLock.writeLock().unlock(); + } + } + + /** + * Process a potential direct connect. Does not use + * the startup queue. It grabs the {@link #latestViewLock} + * and then processes the event. + *

+ * It is a potential event, because we don't know until we've + * grabbed a stable view if this is really a new member. + * + * @param member + */ + private void processSurpriseConnect( + InternalDistributedMember member) + { + addSurpriseMember(member); + } + + /** + * Dispatch routine for processing a single startup event + * @param o the startup event to handle + */ + private void processStartupEvent(StartupEvent o) { + // Most common events first + + if (o.isDistributionMessage()) { // normal message + try { + dispatchMessage(o.dmsg); + } + catch (MemberShunnedException e) { + // message from non-member - ignore + } + } + else if (o.isGmsView()) { // view event + processView(o.gmsView.getViewId(), o.gmsView); + } + else if (o.isSurpriseConnect()) { // connect + processSurpriseConnect(o.member); + } + + else // sanity + throw new InternalGemFireError(LocalizedStrings.GroupMembershipService_UNKNOWN_STARTUP_EVENT_0.toLocalizedString(o)); + } + + /** + * Special mutex to create a critical section for + * {@link #startEventProcessing()} + */ + private final Object startupMutex = new Object(); + + + public void startEventProcessing() + { + // Only allow one thread to perform the work + synchronized (startupMutex) { + if (logger.isDebugEnabled()) + logger.debug("Membership: draining startup events."); + // Remove the backqueue of messages, but allow + // additional messages to be added. + for (;;) { + StartupEvent ev; + // Only grab the mutex while reading the queue. + // Other events may arrive while we're attempting to + // drain the queue. This is OK, we'll just keep processing + // events here until we've caught up. + synchronized (startupLock) { + int remaining = startupMessages.size(); + if (remaining == 0) { + // While holding the lock, flip the bit so that + // no more events get put into startupMessages, and + // notify all waiters to proceed. + processingEvents = true; + startupLock.notifyAll(); + break; // ...and we're done. + } + if (logger.isDebugEnabled()) { + logger.debug("Membership: {} remaining startup message(s)", remaining); + } + ev = (StartupEvent)startupMessages.removeFirst(); + } // startupLock + try { + processStartupEvent(ev); + } + catch (VirtualMachineError err) { + SystemFailure.initiateFailure(err); + // If this ever returns, rethrow the error. We're poisoned + // now, so don't let this thread continue. + throw err; + } + catch (Throwable t) { + // Whenever you catch Error or Throwable, you must also + // catch VirtualMachineError (see above). However, there is + // _still_ a possibility that you are dealing with a cascading + // error condition, so you also need to check to see if the JVM + // is still usable: + SystemFailure.checkFailure(); + logger.warn(LocalizedMessage.create(LocalizedStrings.GroupMembershipService_MEMBERSHIP_ERROR_HANDLING_STARTUP_EVENT), t); + } + + } // for + if (logger.isDebugEnabled()) + logger.debug("Membership: finished processing startup events."); + } // startupMutex + } + + + public void waitForEventProcessing() throws InterruptedException { + // First check outside of a synchronized block. Cheaper and sufficient. + if (Thread.interrupted()) throw new InterruptedException(); + if (processingEvents) + return; + if (logger.isDebugEnabled()) { + logger.debug("Membership: waiting until the system is ready for events"); + } + for (;;) { + directChannel.getCancelCriterion().checkCancelInProgress(null); + synchronized (startupLock) { + // Now check using a memory fence and synchronization. + if (processingEvents) + break; + boolean interrupted = Thread.interrupted(); + try { + startupLock.wait(); + } + catch (InterruptedException e) { + interrupted = true; + directChannel.getCancelCriterion().checkCancelInProgress(e); + } + finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } // synchronized + } // for + if (logger.isDebugEnabled()) { + logger.debug("Membership: continuing"); + } + } + + /** + * for testing we need to validate the startup event list + */ + public List getStartupEvents() { + return this.startupMessages; + } + + public ReadWriteLock getViewLock() { + return this.latestViewLock; + } + + /** + * Returns a copy (possibly not current) of the current + * view (a list of {@link DistributedMember}s) + */ + public NetView getView() + { + // Grab the latest view under a mutex... + NetView v; + + latestViewLock.readLock().lock(); + v = latestView; + latestViewLock.readLock().unlock(); + + NetView result = new NetView(v, v.getViewId()); + + for (InternalDistributedMember m: v.getMembers()) { + if (isShunned(m)) { + result.remove(m); + } + } + + return result; + } + + /** + * test hook

+ * The lead member is the eldest member with partition detection enabled.

+ * If no members have partition detection enabled, there will be no + * lead member and this method will return null. + * @return the lead member associated with the latest view + */ + public DistributedMember getLeadMember() { + latestViewLock.readLock().lock(); + try { + return latestView == null? null : latestView.getLeadMember(); + } finally { + latestViewLock.readLock().unlock(); + } + } + + protected boolean isJoining() { + return this.isJoining; + } + + /** + * test hook + * @return the current membership view coordinator + */ + public DistributedMember getCoordinator() { + // note - we go straight to JoinLeave because the + // DistributionManager queues view changes in a serial executor, where + // they're asynchronously installed. The DS may still see the old coordinator + latestViewLock.readLock().lock(); + try { + return latestView == null? null : latestView.getCoordinator(); + } finally { + latestViewLock.readLock().unlock(); + } + } + + public boolean memberExists(DistributedMember m) { + latestViewLock.readLock().lock(); + NetView v = latestView; + latestViewLock.readLock().unlock(); + return v.getMembers().contains(m); + } + + /** + * Returns the identity associated with this member. WARNING: this value will + * be returned after the channel is closed, but in that case it is good for + * logging purposes only. :-) + */ + public InternalDistributedMember getLocalMember() + { + return address; + } + + public Services getServices() { + return services; + } + + public void postConnect() { + } + + /** + * @see SystemFailure#loadEmergencyClasses() + /** + * break any potential circularity in {@link #loadEmergencyClasses()} + */ + private static volatile boolean emergencyClassesLoaded = false; + + /** + * inhibits logging of ForcedDisconnectException to keep dunit logs clean + * while testing this feature + */ + protected static volatile boolean inhibitForceDisconnectLogging; + + /** + * Ensure that the critical classes from components + * get loaded. + * + * @see SystemFailure#loadEmergencyClasses() + */ + public static void loadEmergencyClasses() { + if (emergencyClassesLoaded) return; + emergencyClassesLoaded = true; + DirectChannel.loadEmergencyClasses(); + GMSJoinLeave.loadEmergencyClasses(); + GMSHealthMonitor.loadEmergencyClasses(); + } + /** + * Close the receiver, avoiding all potential deadlocks and + * eschewing any attempts at being graceful. + * + * @see SystemFailure#emergencyClose() + */ + public void emergencyClose() { + final boolean DEBUG = SystemFailure.TRACE_CLOSE; + + setShutdown(); + + // We can't call close() because they will allocate objects. Attempt + // a surgical strike and touch the important protocols. + + // MOST important, kill the FD protocols... + services.emergencyClose(); + + // Close the TCPConduit sockets... + if (directChannel != null) { + if (DEBUG) { + System.err.println("DEBUG: emergency close of DirectChannel"); + } + directChannel.emergencyClose(); + } + + if (DEBUG) { + System.err.println("DEBUG: done closing GroupMembershipService"); + } + } + + + /** + * in order to avoid split-brain occurring when a member is shutting down due to + * race conditions in view management we add it as a shutdown member when we receive + * a shutdown message. This is not the same as a SHUNNED member. + */ + public void shutdownMessageReceived(InternalDistributedMember id, String reason) { + if (logger.isDebugEnabled()) { + logger.debug("Membership: recording shutdown status of {}", id); + } + synchronized(this.shutdownMembers) { + this.shutdownMembers.put(id, id); + services.getHealthMonitor().memberShutdown(id, reason); + services.getJoinLeave().memberShutdown(id, reason); + } + } + + /** + * returns true if a shutdown message has been received from the given address but + * that member is still in the membership view or is a surprise member. + */ + public boolean isShuttingDown(InternalDistributedMember mbr) { + synchronized(shutdownMembers) { + return shutdownMembers.containsKey(mbr); + } + } + + + public void shutdown() { + setShutdown(); + services.stop(); + } + + @Override + public void stop() { + + // [bruce] Do not null out the channel w/o adding appropriate synchronization + + logger.debug("MembershipManager closing"); + + if (directChannel != null) { + directChannel.disconnect(null); + + if (address != null) { + // Make sure that channel information is consistent + // Probably not important in this particular case, but just + // to be consistent... + latestViewLock.writeLock().lock(); + try { + destroyMember(address, false, "orderly shutdown"); + } finally { + latestViewLock.writeLock().unlock(); + } + } + } + + if (cleanupTimer != null) { + cleanupTimer.cancel(); + } + + if (logger.isDebugEnabled()) { + logger.debug("Membership: channel closed"); + } + } + + public void uncleanShutdown(String reason, final Exception e) { + inhibitForcedDisconnectLogging(false); + + if (services.getShutdownCause() == null) { + services.setShutdownCause(e); + } + + if (this.directChannel != null) { + this.directChannel.disconnect(e); + } + + // first shut down communication so we don't do any more harm to other + // members + services.emergencyClose(); + + if (e != null) { + try { + if (membershipTestHooks != null) { + List l = membershipTestHooks; + for (Iterator it=l.iterator(); it.hasNext(); ) { + MembershipTestHook dml = (MembershipTestHook)it.next(); + dml.beforeMembershipFailure(reason, e); + } + } + listener.membershipFailure(reason, e); + if (membershipTestHooks != null) { + List l = membershipTestHooks; + for (Iterator it=l.iterator(); it.hasNext(); ) { + MembershipTestHook dml = (MembershipTestHook)it.next(); + dml.afterMembershipFailure(reason, e); + } + } + } + catch (RuntimeException re) { + logger.warn(LocalizedMessage.create(LocalizedStrings.GroupMembershipService_EXCEPTION_CAUGHT_WHILE_SHUTTING_DOWN), re); + } + } + } + + /** generate XML for the cache before shutting down due to forced disconnect */ + public void saveCacheXmlForReconnect() { + // there are two versions of this method so it can be unit-tested + boolean sharedConfigEnabled = services.getConfig().getDistributionConfig().getUseSharedConfiguration(); + saveCacheXmlForReconnect(sharedConfigEnabled); + } + + /** generate XML from the cache before shutting down due to forced disconnect */ + public void saveCacheXmlForReconnect(boolean sharedConfigEnabled) { + // first save the current cache description so reconnect can rebuild the cache + GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + if (cache != null && (cache instanceof Cache)) { + if (!Boolean.getBoolean("gemfire.autoReconnect-useCacheXMLFile") + && !cache.isSqlfSystem() && !sharedConfigEnabled) { + try { + logger.info("generating XML to rebuild the cache after reconnect completes"); + StringPrintWriter pw = new StringPrintWriter(); + CacheXmlGenerator.generate((Cache)cache, pw, true, false); + String cacheXML = pw.toString(); + cache.getCacheConfig().setCacheXMLDescription(cacheXML); + logger.info("XML generation completed: {}", cacheXML); + } catch (CancelException e) { + logger.info(LocalizedMessage.create(LocalizedStrings.GroupMembershipService_PROBLEM_GENERATING_CACHE_XML), e); + } + } else if (sharedConfigEnabled && !cache.getCacheServers().isEmpty()) { + // we need to retain a cache-server description if this JVM was started by gfsh + List list = new ArrayList(cache.getCacheServers().size()); + for (Iterator it = cache.getCacheServers().iterator(); it.hasNext(); ) { - CacheServer cs = (CacheServer)it.next(); - CacheServerCreation bsc = new CacheServerCreation(cache, cs); - list.add(bsc); ++ CacheServerImpl cs = (CacheServerImpl)it.next(); ++ if (cs.isDefaultServer()) { ++ CacheServerCreation bsc = new CacheServerCreation(cache, cs); ++ list.add(bsc); ++ } + } + cache.getCacheConfig().setCacheServerCreation(list); + logger.info("CacheServer configuration saved"); + } + } + } + + public boolean requestMemberRemoval(DistributedMember mbr, String reason) { + if (mbr.equals(this.address)) { + return false; + } + logger.warn(LocalizedMessage.create( + LocalizedStrings.GroupMembershipService_MEMBERSHIP_REQUESTING_REMOVAL_OF_0_REASON_1, + new Object[] {mbr, reason})); + try { + services.getJoinLeave().remove((InternalDistributedMember)mbr, reason); + } + catch (RuntimeException e) { + Throwable problem = e; + if (services.getShutdownCause() != null) { + Throwable cause = services.getShutdownCause(); + // If ForcedDisconnectException occurred then report it as actual + // problem. + if (cause instanceof ForcedDisconnectException) { + problem = (Exception) cause; + } else { + Throwable ne = problem; + while (ne.getCause() != null) { + ne = ne.getCause(); + } + try { + ne.initCause(services.getShutdownCause()); + } + catch (IllegalArgumentException selfCausation) { + // fix for bug 38895 - the cause is already in place + } + } + } + if (!services.getConfig().getDistributionConfig().getDisableAutoReconnect()) { + saveCacheXmlForReconnect(); + } + listener.membershipFailure("Channel closed", problem); + throw new DistributedSystemDisconnectedException("Channel closed", problem); + } + return true; + } + + public void suspectMembers(Set members, String reason) { + for (Iterator it=members.iterator(); it.hasNext(); ) { + verifyMember((DistributedMember)it.next(), reason); + } + } + + public void suspectMember(DistributedMember mbr, String reason) { + if (!this.shutdownInProgress && !this.shutdownMembers.containsKey(mbr)) { + verifyMember(mbr, reason); + } + } + + /* like memberExists() this checks to see if the given ID is in the current + * membership view. If it is in the view though we try to contact it + * to see if it's still around. If we can't contact it then + * suspect messages are sent to initiate final checks + * @param mbr the member to verify + * @param reason why the check is being done (must not be blank/null) + * @return true if the member checks out + */ + public boolean verifyMember(DistributedMember mbr, String reason) { + if (mbr != null && memberExists((InternalDistributedMember)mbr)) { + return this.services.getHealthMonitor().checkIfAvailable(mbr, reason, true); + } + return false; + } + + /** + * Perform the grossness associated with sending a message over + * a DirectChannel + * + * @param destinations the list of destinations + * @param content the message + * @param theStats the statistics object to update + * @return all recipients who did not receive the message (null if + * all received it) + * @throws NotSerializableException if the message is not serializable + */ + protected Set directChannelSend(InternalDistributedMember[] destinations, + DistributionMessage content, + DMStats theStats) + throws NotSerializableException + { + boolean allDestinations; + InternalDistributedMember[] keys; + if (content.forAll()) { + allDestinations = true; + latestViewLock.writeLock().lock(); + try { + List keySet = latestView.getMembers(); + keys = new InternalDistributedMember[keySet.size()]; + keys = (InternalDistributedMember[])keySet.toArray(keys); + } finally { + latestViewLock.writeLock().unlock(); + } + } + else { + allDestinations = false; + keys = destinations; + } + + int sentBytes = 0; + try { + sentBytes = directChannel.send(this, keys, content, + this.services.getConfig().getDistributionConfig().getAckWaitThreshold(), + this.services.getConfig().getDistributionConfig().getAckSevereAlertThreshold()); + + if (theStats != null) { + theStats.incSentBytes(sentBytes); + } + + if (sentBytes == 0) { + if (services.getCancelCriterion().cancelInProgress() != null) { + throw new DistributedSystemDisconnectedException(); + } + } + } + catch (DistributedSystemDisconnectedException ex) { + if (services.getShutdownCause() != null) { + throw new DistributedSystemDisconnectedException("DistributedSystem is shutting down", services.getShutdownCause()); + } else { + throw ex; // see bug 41416 + } + } + catch (ConnectExceptions ex) { + if (allDestinations) + return null; + + List members = ex.getMembers(); // We need to return this list of failures + + // SANITY CHECK: If we fail to send a message to an existing member + // of the view, we have a serious error (bug36202). + NetView view = services.getJoinLeave().getView(); // grab a recent view, excluding shunned members + + // Iterate through members and causes in tandem :-( + Iterator it_mem = members.iterator(); + Iterator it_causes = ex.getCauses().iterator(); + while (it_mem.hasNext()) { + InternalDistributedMember member = (InternalDistributedMember)it_mem.next(); + Throwable th = (Throwable)it_causes.next(); + + if (!view.contains(member) || (th instanceof ShunnedMemberException)) { + continue; + } + logger.fatal(LocalizedMessage.create( + LocalizedStrings.GroupMembershipService_FAILED_TO_SEND_MESSAGE_0_TO_MEMBER_1_VIEW_2, + new Object[] {content, member, view}), th); +// Assert.assertTrue(false, "messaging contract failure"); + } + return new HashSet(members); + } // catch ConnectionExceptions + catch (ToDataException | CancelException e) { + throw e; + } + catch (IOException e) { + if (logger.isDebugEnabled()) { + logger.debug("Membership: directChannelSend caught exception: {}", e.getMessage(), e); + } + if (e instanceof NotSerializableException) { + throw (NotSerializableException)e; + } + } + catch (RuntimeException e) { + if (logger.isDebugEnabled()) { + logger.debug("Membership: directChannelSend caught exception: {}", e.getMessage(), e); + } + throw e; + } + catch (Error e) { + if (logger.isDebugEnabled()) { + logger.debug("Membership: directChannelSend caught exception: {}", e.getMessage(), e); + } + throw e; + } + return null; + } + + /* + * (non-Javadoc) + * @see com.gemstone.gemfire.distributed.internal.membership.MembershipManager#isConnected() + */ + public boolean isConnected() { + return (this.hasJoined && !this.shutdownInProgress); + } + + /** + * Returns true if the distributed system is in the process of auto-reconnecting. + * Otherwise returns false. + */ + public boolean isReconnectingDS() { + if (this.hasJoined) { + return false; + } else { + return this.wasReconnectingSystem; + } + } + + @Override + public QuorumChecker getQuorumChecker() { + if ( ! (services.isShutdownDueToForcedDisconnect()) ) { + return null; + } + if (this.quorumChecker != null) { + return this.quorumChecker; + } + + QuorumChecker impl = services.getMessenger().getQuorumChecker(); + this.quorumChecker = impl; + return impl; + } + + @Override + public void releaseQuorumChecker(QuorumChecker checker) { + ((GMSQuorumChecker)checker).suspend(); + InternalDistributedSystem system = InternalDistributedSystem.getAnyInstance(); + if (system == null || !system.isConnected()) { + checker.close(); + } + } + + public Set send(InternalDistributedMember dest, DistributionMessage msg) + throws NotSerializableException { + + InternalDistributedMember dests[] = new InternalDistributedMember[] { dest }; + return send (dests, msg, null); + } + + public Set send(InternalDistributedMember[] destinations, + DistributionMessage msg, + DMStats theStats) + throws NotSerializableException + { + Set result = null; + boolean allDestinations = msg.forAll(); + + if (services.getCancelCriterion().cancelInProgress() != null) { + throw new DistributedSystemDisconnectedException("Distributed System is shutting down", + services.getCancelCriterion().generateCancelledException(null)); + } + + if (playingDead) { // wellness test hook + while (playingDead && !shutdownInProgress) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + if (isJoining()) { + // If we get here, we are starting up, so just report a failure. + if (allDestinations) + return null; + else { + result = new HashSet(); + for (int i = 0; i < destinations.length; i ++) + result.add(destinations[i]); + return result; + } + } + + if (msg instanceof AdminMessageType + && this.shutdownInProgress) { + // no admin messages while shutting down - this can cause threads to hang + return new HashSet(Arrays.asList(msg.getRecipients())); + } + + // Handle trivial cases + if (destinations == null) { + if (logger.isTraceEnabled()) + logger.trace("Membership: Message send: returning early because null set passed in: '{}'", msg); + return null; // trivially: all recipients received the message + } + if (destinations.length == 0) { + if (logger.isTraceEnabled()) + logger.trace("Membership: Message send: returning early because empty destination list passed in: '{}'", msg); + return null; // trivially: all recipients received the message + } + + msg.setSender(address); + + msg.setBreadcrumbsInSender(); + Breadcrumbs.setProblem(null); + + boolean useMcast = false; + if (mcastEnabled) { + useMcast = (msg.getMulticast() || allDestinations); + } + + boolean sendViaMessenger = isForceUDPCommunications(); // enable when bug #46438 is fixed: || msg.sendViaUDP(); + + if (useMcast || tcpDisabled || sendViaMessenger) { + checkAddressesForUUIDs(destinations); + result = services.getMessenger().send(msg); + } + else { + result = directChannelSend(destinations, msg, theStats); + } + + // If the message was a broadcast, don't enumerate failures. + if (allDestinations) + return null; + else { + return result; + } + } + + // MembershipManager method + @Override + public void forceUDPMessagingForCurrentThread() { + forceUseUDPMessaging.set(null); + } + + void checkAddressesForUUIDs(InternalDistributedMember[] addresses) { + for (int i=0; i now) + return true; + + // Oh, it _is_ stale. Remove it while we're here. + endShun(m); + return false; + } finally { + latestViewLock.writeLock().unlock(); + } + } + + /** + * Indicate whether the given member is in the surprise member list + *

+ * Unlike isShunned, this method will not cause expiry of a surprise member. + * That must be done during view processing. + *

+ * Like isShunned, this method holds the view lock while executing + * + * Concurrency: protected by {@link #latestViewLock} ReentrantReadWriteLock + * + * @param m the member in question + * @return true if the given member is a surprise member + */ + public boolean isSurpriseMember(DistributedMember m) { + latestViewLock.readLock().lock(); + try { + if (surpriseMembers.containsKey(m)) { + long birthTime = ((Long)surpriseMembers.get(m)).longValue(); + long now = System.currentTimeMillis(); + return (birthTime >= (now - this.surpriseMemberTimeout)); + } + return false; + } finally { + latestViewLock.readLock().unlock(); + } + } + + /** + * for testing we need to be able to inject surprise members into + * the view to ensure that sunsetting works properly + * @param m the member ID to add + * @param birthTime the millisecond clock time that the member was first seen + */ + public void addSurpriseMemberForTesting(DistributedMember m, long birthTime) { + if (logger.isDebugEnabled()) { + logger.debug("test hook is adding surprise member {} birthTime={}", m, birthTime); + } + latestViewLock.writeLock().lock(); + try { + surpriseMembers.put((InternalDistributedMember)m, Long.valueOf(birthTime)); + } finally { + latestViewLock.writeLock().unlock(); + } + } + + /** + * returns the surpriseMemberTimeout interval, in milliseconds + */ + public int getSurpriseMemberTimeout() { + return this.surpriseMemberTimeout; + } + + private boolean endShun(DistributedMember m) { + boolean wasShunned = (shunnedMembers.remove(m) != null); + shunnedAndWarnedMembers.remove(m); + return wasShunned; + } + + /** + * Add the given member to the shunned list. Also, purge any shunned + * members that are really really old. + *

+ * Must be called with {@link #latestViewLock} held and + * the view stable. + * + * @param m the member to add + */ + protected void addShunnedMember(InternalDistributedMember m) { + long deathTime = System.currentTimeMillis() - SHUNNED_SUNSET * 1000; + + surpriseMembers.remove(m); // for safety + + // Update the shunned set. + if (!isShunned(m)) { + shunnedMembers.put(m, Long.valueOf(System.currentTimeMillis())); + } + + // Remove really really old shunned members. + // First, make a copy of the old set. New arrivals _a priori_ don't matter, + // and we're going to be updating the list so we don't want to disturb + // the iterator. + Set oldMembers = new HashSet(shunnedMembers.entrySet()); + + Set removedMembers = new HashSet(); + + Iterator it = oldMembers.iterator(); + while (it.hasNext()) { + Map.Entry e = (Map.Entry)it.next(); + + // Key is the member. Value is the time to remove it. + long ll = ((Long)e.getValue()).longValue(); + if (ll >= deathTime) { + continue; // too new. + } + + InternalDistributedMember mm = (InternalDistributedMember)e.getKey(); + + if (latestView.contains(mm)) { + // Fault tolerance: a shunned member can conceivably linger but never + // disconnect. + // + // We may not delete it at the time that we shun it because the view + // isn't necessarily stable. (Note that a well-behaved cache member + // will depart on its own accord, but we force the issue here.) + destroyMember(mm, true, "shunned but never disconnected"); + } + if (logger.isDebugEnabled()) { + logger.debug("Membership: finally removed shunned member entry <{}>", mm); + } + + removedMembers.add(mm); + } + + // removed timed-out entries from the shunned-members collections + if (removedMembers.size() > 0) { + it = removedMembers.iterator(); + while (it.hasNext()) { + InternalDistributedMember idm = (InternalDistributedMember)it.next(); + endShun(idm); + } + } + } + + + /** + * Retrieve thread-local data for transport to another thread in hydra + */ + public Object getThreadLocalData() { + Map result = new HashMap(); + return result; + } + + /** + * for testing verification purposes, this return the port for the + * direct channel, or zero if there is no direct + * channel + */ + public int getDirectChannelPort() { + return directChannel == null? 0 : directChannel.getPort(); + } + + /** + * for mock testing this allows insertion of a DirectChannel mock + */ + protected void setDirectChannel(DirectChannel dc) { + this.directChannel = dc; + this.tcpDisabled = false; + } + + /* non-thread-owned serial channels and high priority channels are not + * included + */ + public Map getMessageState(DistributedMember member, boolean includeMulticast) { + Map result = new HashMap(); + DirectChannel dc = directChannel; + if (dc != null) { + dc.getChannelStates(member, result); + } + services.getMessenger().getMessageState((InternalDistributedMember)member, result, includeMulticast); + return result; + } + + public void waitForMessageState(DistributedMember otherMember, Map state) + throws InterruptedException + { + if (Thread.interrupted()) throw new InterruptedException(); + DirectChannel dc = directChannel; + if (dc != null) { + dc.waitForChannelState(otherMember, state); + } + services.getMessenger().waitForMessageState((InternalDistributedMember)otherMember, state); + } + + /* + * (non-Javadoc) + * MembershipManager method: wait for the given member to be gone. Throws TimeoutException if + * the wait goes too long + * @see com.gemstone.gemfire.distributed.internal.membership.MembershipManager#waitForDeparture(com.gemstone.gemfire.distributed.DistributedMember) + */ + public boolean waitForDeparture(DistributedMember mbr) throws TimeoutException, InterruptedException { + if (Thread.interrupted()) throw new InterruptedException(); + boolean result = false; + DirectChannel dc = directChannel; + InternalDistributedMember idm = (InternalDistributedMember)mbr; + int memberTimeout = this.services.getConfig().getDistributionConfig().getMemberTimeout(); + long pauseTime = (memberTimeout < 1000) ? 100 : memberTimeout / 10; + boolean wait; + int numWaits = 0; + do { + wait = false; + if (dc != null) { + if (dc.hasReceiversFor(idm)) { + wait = true; + } + if (wait && logger.isDebugEnabled()) { + logger.info("waiting for receivers for {} to shut down", mbr); + } + } + if (!wait) { + latestViewLock.readLock().lock(); + try { + wait = this.latestView.contains(idm); + } finally { + latestViewLock.readLock().unlock(); + } + if (wait && logger.isDebugEnabled()) { + logger.debug("waiting for {} to leave the membership view", mbr); + } + } + if (!wait) { + // run a message through the member's serial execution queue to ensure that all of its + // current messages have been processed + OverflowQueueWithDMStats serialQueue = listener.getDM().getSerialQueue(idm); + if (serialQueue != null) { + final boolean done[] = new boolean[1]; + final FlushingMessage msg = new FlushingMessage(done); + serialQueue.add(new SizeableRunnable(100) { + public void run() { + msg.invoke(); + } + public String toString() { + return "Processing fake message"; + } + }); + synchronized(done) { + while (done[0] == false) { + done.wait(10); + } + result = true; + } + } + } + if (wait) { + numWaits++; + if (numWaits > 40) { + // waited over 4 * memberTimeout ms. Give up at this point + throw new TimeoutException("waited too long for " + idm + " to be removed"); + } + Thread.sleep(pauseTime); + } + } while (wait && (dc != null && dc.isOpen()) + && services.getCancelCriterion().cancelInProgress()==null ); + if (logger.isDebugEnabled()) { + logger.debug("operations for {} should all be in the cache at this point", mbr); + } + return result; + } + + + // TODO remove this overly complex method and replace its use with + // waitForViewChange using the remote member's view ID + public boolean waitForMembershipCheck(InternalDistributedMember remoteId) { + boolean foundRemoteId = false; + CountDownLatch currentLatch = null; + // ARB: preconditions + // remoteId != null + latestViewLock.writeLock().lock(); + try { + if (latestView == null) { + // Not sure how this would happen, but see bug 38460. + // No view?? Not found! + } + else if (latestView.contains(remoteId)) { + // ARB: check if remoteId is already in membership view. + // If not, then create a latch if needed and wait for the latch to open. + foundRemoteId = true; + } + else if ((currentLatch = (CountDownLatch)this.memberLatch.get(remoteId)) == null) { + currentLatch = new CountDownLatch(1); + this.memberLatch.put(remoteId, currentLatch); + } + } finally { + latestViewLock.writeLock().unlock(); + } + + if (!foundRemoteId) { + try { + if (currentLatch.await(membershipCheckTimeout, TimeUnit.MILLISECONDS)) { + foundRemoteId = true; + // @todo + // ARB: remove latch from memberLatch map if this is the last thread waiting on latch. + } + } + catch (InterruptedException ex) { + // ARB: latch attempt was interrupted. + Thread.currentThread().interrupt(); + logger.warn(LocalizedMessage.create( + LocalizedStrings.GroupMembershipService_THE_MEMBERSHIP_CHECK_WAS_TERMINATED_WITH_AN_EXCEPTION)); + } + } + + // ARB: postconditions + // (foundRemoteId == true) ==> (currentLatch is non-null ==> currentLatch is open) + return foundRemoteId; + } + + /* returns the cause of shutdown, if known */ + public Throwable getShutdownCause() { + return services.getShutdownCause(); + } + +// @Override +// public void membershipFailure(String reason, Exception e) { +//