geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bschucha...@apache.org
Subject [10/14] incubator-geode git commit: jgroups source is now removed from asf branch GEODE-77 and is a downloaded dependency. Able to start/stop a locator and a server via gfsh but there is no HA or authentication in the membership system.
Date Mon, 27 Jul 2015 20:27:54 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d2a942e8/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
new file mode 100755
index 0000000..b15caf9
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
@@ -0,0 +1,3006 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.distributed.internal.membership.gms.mgr;
+
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.ForcedDisconnectException;
+import com.gemstone.gemfire.GemFireConfigException;
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.SystemConnectException;
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.ToDataException;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache.util.BoundedLinkedHashMap;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
+import com.gemstone.gemfire.distributed.Locator;
+import com.gemstone.gemfire.distributed.internal.DMStats;
+import com.gemstone.gemfire.distributed.internal.DSClock;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.DistributionException;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.DistributionMessage;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.InternalLocator;
+import com.gemstone.gemfire.distributed.internal.SizeableRunnable;
+import com.gemstone.gemfire.distributed.internal.StartupMessage;
+import com.gemstone.gemfire.distributed.internal.ThrottlingMemLinkedQueueWithDMStats;
+import com.gemstone.gemfire.distributed.internal.direct.DirectChannel;
+import com.gemstone.gemfire.distributed.internal.membership.DistributedMembershipListener;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.distributed.internal.membership.MemberAttributes;
+import com.gemstone.gemfire.distributed.internal.membership.MembershipManager;
+import com.gemstone.gemfire.distributed.internal.membership.MembershipTestHook;
+import com.gemstone.gemfire.distributed.internal.membership.NetMember;
+import com.gemstone.gemfire.distributed.internal.membership.NetView;
+import com.gemstone.gemfire.distributed.internal.membership.QuorumChecker;
+import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
+import com.gemstone.gemfire.distributed.internal.membership.gms.SuspectMember;
+import com.gemstone.gemfire.distributed.internal.membership.gms.fd.GMSHealthMonitor;
+import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Manager;
+import com.gemstone.gemfire.distributed.internal.membership.gms.membership.GMSJoinLeave;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messenger.JGroupsQuorumChecker;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.SystemTimer;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.xmlcache.BridgeServerCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheXmlGenerator;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.InternalLogWriter;
+import com.gemstone.gemfire.internal.logging.log4j.AlertAppender;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.shared.StringPrintWriter;
+import com.gemstone.gemfire.internal.tcp.ConnectExceptions;
+import com.gemstone.gemfire.internal.tcp.ConnectionException;
+import com.gemstone.gemfire.internal.tcp.MemberShunnedException;
+import com.gemstone.gemfire.internal.tcp.Stub;
+import com.gemstone.gemfire.internal.tcp.TCPConduit;
+import com.gemstone.gemfire.internal.util.Breadcrumbs;
+
+public class GMSMembershipManager implements MembershipManager, Manager
+{
+  private static final Logger logger = Services.getLogger();
+  
+  /** product version to use for multicast serialization */
+  volatile boolean disableMulticastForRollingUpgrade;
+  
+  /**
+   * set to true if the distributed system that created this manager was
+   * auto-reconnecting when it was created.
+   */
+  boolean wasReconnectingSystem;
+  
+  /**
+   * A quorum checker is created during reconnect and is held
+   * here so it is available to the UDP protocol for passing off
+   * the ping-pong responses used in the quorum-checking algorithm. 
+   */
+  private volatile JGroupsQuorumChecker quorumChecker;
+  
+  /**
+   * during an auto-reconnect attempt set this to the old DistributedSystem's
+   * UDP port socket.  The failure detection protocol will pick it up and use it.
+   */
+  private volatile DatagramSocket oldDSUDPSocket;
+  
+  /**
+   * A general use timer
+   */
+  private Timer timer = new Timer("Membership Timer", true);
+  
+  
+  /**
+   * Trick class to make the startup synch more
+   * visible in stack traces
+   * 
+   * @see GMSMembershipManager#startupLock
+   */
+  static class EventProcessingLock  {
+    public EventProcessingLock() {
+    }
+  }
+  
+  /**
+   * Trick class to make the view lock more visible
+   * in stack traces
+   * 
+   */
+  static class ViewLock extends ReentrantReadWriteLock {
+    public ViewLock() {
+    }
+  }
+  
+  static class StartupEvent  {
+    static final int DEPARTURE = 1;
+    static final int CONNECT = 2;
+    static final int VIEW = 3;
+    static final int MESSAGE = 4;
+    
+    /**
+     * indicates whether the event is a departure, a surprise connect
+     * (i.e., before the view message arrived), a view, or a regular
+     * message
+     * 
+     * @see #DEPARTURE
+     * @see #CONNECT
+     * @see #VIEW
+     * @see #MESSAGE
+     */
+    private int kind;
+    
+    // Miscellaneous state depending on the kind of event
+    InternalDistributedMember member;
+    boolean crashed;
+    String reason;
+    DistributionMessage dmsg;
+    Stub stub;
+    NetView gmsView;
+    
+    @Override
+    public String toString() {
+      StringBuffer sb = new StringBuffer();
+      sb.append("kind=");
+      switch (kind) {
+      case DEPARTURE:
+        sb.append("departure; member = <")
+          .append(member)
+          .append(">; crashed = ")
+          .append(crashed)
+          .append("; reason = ");
+        if (reason != null && (reason.indexOf("NoSuchMemberException") >= 0)) {
+          sb.append(LocalizedStrings.GroupMembershipService_TCPIP_CONNECTIONS_CLOSED.toLocalizedString());
+        }
+        else {
+          sb.append(reason);
+        }
+        break;
+      case CONNECT:
+        sb.append("connect; member = <" + member + ">; stub = " + stub);
+        break;
+      case VIEW:
+        String text = gmsView.toString();
+        sb.append("view <" + text + ">");
+        break;
+      case MESSAGE:
+        sb.append("message <" + dmsg + ">");
+        break;
+      default:
+        sb.append("unknown=<" + kind + ">");
+        break;
+      }
+      return sb.toString();
+    }
+    /**
+     * Create a departure event
+     * @param dm the member that left
+     * @param crashed true if this member crashed
+     * @param reason reason string, esp. if crashed
+     */
+    StartupEvent(InternalDistributedMember dm, boolean crashed, String reason) {
+      this.kind = DEPARTURE;
+      this.member = dm;
+      this.crashed = crashed;
+      this.reason = reason;
+    }
+    /**
+     * Indicate if this is a departure
+     * @return true if this is a departure event
+     */
+    boolean isDepartureEvent() {
+      return this.kind == DEPARTURE;
+    }
+
+    /**
+     * Create a connect event
+     * @param member the member connecting
+     * @param id the stub
+     */
+    StartupEvent(final InternalDistributedMember member, final Stub id) {
+      this.kind = CONNECT;
+      this.member = member;
+      this.stub = id;
+    }
+    /**
+     * Indicate if this is a connect event
+     * @return true if this is a connect event
+     */
+    boolean isConnect() {
+      return this.kind == CONNECT;
+    }
+
+    /**
+     * Create a view event
+     * @param v the new view
+     */
+    StartupEvent(NetView v) {
+      this.kind = VIEW;
+      this.gmsView = v;
+    }
+    
+    /**
+     * Indicate if this is a view event
+     * @return true if this is a view event
+     */
+    boolean isGmsView() {
+      return this.kind == VIEW;
+    }
+
+    /**
+     * Create a message event
+     * @param d the message
+     */
+    StartupEvent(DistributionMessage d) {
+      this.kind = MESSAGE;
+      this.dmsg = d;
+    }
+    /**
+     * Indicate if this is a message event
+     * @return true if this is a message event
+     */
+    boolean isDistributionMessage() {
+      return this.kind == MESSAGE;
+    }
+  }
+  
+  private int membershipCheckTimeout = DistributionConfig.DEFAULT_SECURITY_PEER_VERIFYMEMBER_TIMEOUT;
+
+  /**
+   * This object synchronizes threads waiting for
+   * startup to finish.  Updates to {@link #startupMessages}
+   * are synchronized through this object.
+   */
+  protected final EventProcessingLock startupLock = new EventProcessingLock();
+  
+  /**
+   * This is the latest view (ordered list of DistributedMembers) 
+   * that has been returned from Jgroups
+   * 
+   * All accesses to this object are protected via {@link #latestViewLock}
+   */
+  protected NetView latestView = new NetView();
+
+  /**
+   * This is the lock for protecting access to latestView
+   * 
+   * @see #latestView
+   */
+  protected ViewLock latestViewLock = new ViewLock();
+  
+  /**
+   * This is the listener that accepts our membership events
+   */
+  protected com.gemstone.gemfire.distributed.internal.membership.DistributedMembershipListener listener;
+
+  /**
+   * Membership failure listeners - for testing
+   */
+  List membershipTestHooks;
+  
+  /**
+   * This is a representation of the local member (ourself)
+   */
+  protected InternalDistributedMember address = null; // new DistributedMember(-1);
+
+  protected DirectChannel directChannel;
+
+  protected TCPConduit conduit;
+  
+  protected MyDCReceiver dcReceiver;
+  
+  volatile boolean isJoining;
+  
+  /** has the jgroups channel been connected successfully? */
+  volatile boolean hasConnected;
+  
+  /**
+   * a map keyed on InternalDistributedMember, values are Stubs that represent direct
+   * channels to other systems
+   * 
+   * Accesses must be under the read or write lock of {@link #latestViewLock}.
+   */
+  protected final Map memberToStubMap = new ConcurrentHashMap();
+
+  /**
+   * a map of direct channels (Stub) to InternalDistributedMember. key instanceof Stub
+   * value instanceof InternalDistributedMember
+   * 
+   * Accesses must be under the read or write lock of {@link #latestViewLock}.
+   */
+  protected final Map stubToMemberMap = new ConcurrentHashMap();
+  
+  /**
+   * Members of the distributed system that we believe have shut down.
+   * Keys are instances of {@link InternalDistributedMember}, values are 
+   * Longs indicating the time this member was shunned.
+   * 
+   * Members are removed after {@link #SHUNNED_SUNSET} seconds have
+   * passed.
+   * 
+   * Accesses to this list needs to be under the read or write lock of {@link #latestViewLock}
+   * 
+   * @see System#currentTimeMillis()
+   */
+//  protected final Set shunnedMembers = Collections.synchronizedSet(new HashSet());
+  protected final Map shunnedMembers = new ConcurrentHashMap();
+
+  /**
+   * Members that have sent a shutdown message.  This is used to suppress
+   * suspect processing in JGroups that otherwise becomes pretty aggressive 
+   * when a member is shutting down.
+   */
+  private final Map shutdownMembers = new BoundedLinkedHashMap(1000);
+  
+  /**
+   * per bug 39552, keep a list of members that have been shunned and for
+   * which a message is printed.  Contents of this list are cleared at the
+   * same time they are removed from {@link #shunnedMembers}.
+   * 
+   * Accesses to this list needs to be under the read or write lock of {@link #latestViewLock}
+   */
+  protected final HashSet shunnedAndWarnedMembers = new HashSet();
+  /**
+   * The identities and birth-times of others that we have allowed into
+   * membership at the distributed system level, but have not yet appeared
+   * in a jgroups view.
+   * <p>
+   * Keys are instances of {@link InternalDistributedMember}, values are 
+   * Longs indicating the time this member was shunned.
+   * <p>
+   * Members are removed when a view containing them is processed.  If,
+   * after {@link #surpriseMemberTimeout} milliseconds have passed, a view
+   * containing the member has not arrived, the member is removed from
+   * membership and member-left notification is performed. 
+   * <p>>
+   * Accesses to this list needs to be under the read or write lock of {@link #latestViewLock}
+   * 
+   * @see System#currentTimeMillis()
+   */
+  protected final Map<InternalDistributedMember, Long> surpriseMembers = new ConcurrentHashMap();
+
+  /**
+   * the timeout interval for surprise members.  This is calculated from 
+   * the member-timeout setting
+   */
+  protected int surpriseMemberTimeout;
+  
+  /**
+   * javagroups can skip views and omit telling us about a crashed member.
+   * This map holds a history of suspected members that we use to detect
+   * crashes.
+   */
+  private final Map<InternalDistributedMember, Long> suspectedMembers = new ConcurrentHashMap();
+  
+  /**
+   * the timeout interval for suspected members
+   */
+  private final long suspectMemberTimeout = 180000;
+  
+  /** sleep period, in millis, that the user of this manager should slumber after creating
+      the manager.  This is advice from the JChannel itself when it detects a concurrent
+      startup race condition that requires a settling period. */
+  private long channelPause = 0;  
+
+  /**
+   * Length of time, in seconds, that a member is retained in the zombie set
+   * 
+   * @see #shunnedMembers
+   */
+  static private final int SHUNNED_SUNSET = Integer.getInteger(
+      "gemfire.shunned-member-timeout", 300).intValue();
+  
+  /**
+   * Set to true when the service should stop.
+   */
+  protected volatile boolean shutdownInProgress = false;
+  
+  /**
+   * Set to true when upcalls should be generated for
+   * events.
+   */
+  protected volatile boolean processingEvents = false;
+  
+  /**
+   * This is the latest viewId received from JGroups
+   */
+  long latestViewId = -1;
+  
+  /** distribution manager statistics */
+  DMStats stats;
+
+  /**
+   A list of messages received during channel startup that couldn't be processed yet.
+   Additions or removals of this list must be synchronized
+   via {@link #startupLock}.
+   @since 5.0
+   */
+  protected LinkedList<StartupEvent> startupMessages = new LinkedList<StartupEvent>();
+  
+  /** the type of vm we're running in. This is also in the membership id, 
+      but is needed by some methods before the membership id has been
+      created. */
+  int vmKind;
+
+  /**
+   * ARB: the map of latches is used to block peer handshakes till
+   * authentication is confirmed.
+   */
+  final private HashMap memberLatch = new HashMap();
+  
+  /**
+   * Insert our own MessageReceiver between us and the direct channel, in order
+   * to correctly filter membership events.
+   * 
+   * @author jpenney
+   * 
+   */
+  class MyDCReceiver implements DistributedMembershipListener
+  {
+
+    DistributedMembershipListener upCall;
+    
+    /**
+     * Don't provide events until the caller has told us we are ready.
+     * 
+     * Synchronization provided via GroupMembershipService.class.
+     * 
+     * Note that in practice we only need to delay accepting the first
+     * client; we don't need to put this check before every call...
+     *
+     */
+   MyDCReceiver(DistributedMembershipListener up) {
+      upCall = up;
+    }
+
+    public void messageReceived(DistributionMessage msg)
+    {
+      // bug 36851 - notify failure detection that we've had contact from a member
+      services.getHealthMonitor().contactedBy(msg.getSender());
+      handleOrDeferMessage(msg);
+    }
+
+    public void newMemberConnected(final InternalDistributedMember member, final Stub id)
+    {
+      handleOrDeferSurpriseConnect(member, id);
+    }
+
+    public void memberDeparted(InternalDistributedMember id, boolean crashed, String reason)
+    {
+      try {
+        handleOrDeferRemove(id, crashed, reason);
+      }
+      catch (DistributedSystemDisconnectedException ignore) {
+        // ignore
+      }
+      catch (RuntimeException e) {
+        logger.warn(LocalizedMessage.create(LocalizedStrings.GroupMembershipService_MEMBERSHIP_ERROR_HANDLING_MEMBER_DEPARTURE__0), e);
+      }
+    }
+    
+    public void quorumLost(Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {
+    }
+
+    public void memberSuspect(InternalDistributedMember suspect, InternalDistributedMember whoSuspected) {
+      // the direct channel isn't currently a source of suspect events, though
+      // it does request initiation of suspicion through the membership
+      // manager
+    }
+
+    public boolean isShutdownMsgSent()
+    {
+      return upCall.isShutdownMsgSent();
+    }
+
+    public void membershipFailure(String reason, Throwable t)
+    {
+      upCall.membershipFailure(reason, t);
+    }
+    
+    public void viewInstalled(NetView view) {
+      upCall.viewInstalled(view);
+    }
+
+    public DistributionManager getDM()
+    {
+     return upCall.getDM();
+    }
+
+  }
+
+
+  /** if we connect to a locator that has NPD enabled then we enable it in this VM */
+  public void enableNetworkPartitionDetection() {
+    if (logger.isDebugEnabled()) {
+      logger.debug("Network partition detection is being enabled");
+    }
+    this.services.getConfig().getDistributionConfig().setEnableNetworkPartitionDetection(true);
+    this.services.getConfig().setNetworkPartitionDetectionEnabled(true);
+  }
+  
+  /**
+   * Analyze a given view object, generate events as appropriate
+   * 
+   * @param newView
+   */
+  protected void processView(long newViewId, NetView newView)
+  {
+    // Sanity check...
+     if (logger.isDebugEnabled()) {
+      StringBuffer msg = new StringBuffer(200);
+      msg.append("Membership: Processing view ");
+      msg.append(newView);
+      msg.append("} on " + address.toString());
+      if (logger.isDebugEnabled()) {
+        logger.debug(LogMarker.DM_VIEWS, msg);
+      }
+      if (!newView.contains(address)) {
+        logger.info(LocalizedMessage.create(
+            LocalizedStrings.GroupMembershipService_THE_MEMBER_WITH_ID_0_IS_NO_LONGER_IN_MY_OWN_VIEW_1,
+            new Object[] {address, newView}));
+      }
+    }
+     
+//     if (newView.getCrashedMembers().size() > 0) {
+//       // dump stack for debugging #39827
+//       OSProcess.printStacks(0);
+//     }
+    // We perform the update under a global lock so that other
+    // incoming events will not be lost in terms of our global view.
+    synchronized (latestViewLock) {
+      // first determine the version for multicast message serialization
+      Version version = Version.CURRENT;
+      for (Iterator<Map.Entry<InternalDistributedMember, Long>> it=surpriseMembers.entrySet().iterator(); it.hasNext(); ) {
+        InternalDistributedMember mbr = it.next().getKey();
+        Version itsVersion = mbr.getVersionObject();
+        if (itsVersion != null && version.compareTo(itsVersion) < 0) {
+          version = itsVersion;
+        }
+      }
+      for (InternalDistributedMember mbr: newView.getMembers()) {
+        Version itsVersion = mbr.getVersionObject();
+        if (itsVersion != null && itsVersion.compareTo(version) < 0) {
+          version = mbr.getVersionObject();
+        }
+      }
+      disableMulticastForRollingUpgrade = !version.equals(Version.CURRENT);
+      
+      if (newViewId < latestViewId) {
+        // ignore this view since it is old news
+        if (newViewId < latestViewId && logger.isDebugEnabled(LogMarker.DISTRIBUTION_VIEWS)) {
+          logger.debug(LogMarker.DISTRIBUTION_VIEWS, "Membership: Ignoring view (with id {}) since it is older than the last view (with id {}); ignoredView={}",
+              newViewId, latestViewId, newView);
+        }
+        return;
+      }
+
+      // Save previous view, for delta analysis
+      NetView priorView = latestView;
+      
+      // update the view to reflect our changes, so that
+      // callbacks will see the new (updated) view.
+      latestViewId = newViewId;
+      latestView = new NetView(newView, newView.getViewId());
+      
+      // look for additions
+      for (int i = 0; i < newView.getMembers().size(); i++) { // additions
+        InternalDistributedMember m = (InternalDistributedMember)newView.getMembers().get(i);
+        
+        // Once a member has been seen via JGroups, remove them from the
+        // newborn set
+        boolean wasSurprise = surpriseMembers.remove(m) != null;
+        
+        // bug #45155 - membership view processing was slow, causing a member to connect as "surprise"
+        // and the surprise timeout removed the member and shunned it, keeping it from being
+        // recognized as a valid member when it was finally seen in a view
+//        if (isShunned(m)) {
+//          warnShuns.add(m);
+//          continue;
+//        }
+
+        // if it's in a view, it's no longer suspect
+        suspectedMembers.remove(m);
+
+        if (priorView.contains(m) || wasSurprise) {
+          continue; // already seen
+        }
+      
+        // ARB: unblock any waiters for this particular member.
+        // i.e. signal any waiting threads in tcpconduit.
+        String authInit = this.services.getConfig().getDistributionConfig().getSecurityPeerAuthInit();
+        boolean isSecure = authInit != null && authInit.length() != 0;
+
+        if (isSecure) {
+          CountDownLatch currentLatch;
+          if ((currentLatch = (CountDownLatch)memberLatch.get(m)) != null) {
+            currentLatch.countDown();
+          }
+        }
+
+        // fix for bug #42006, lingering old identity
+        Object oldStub = this.memberToStubMap.remove(m);
+        if (oldStub != null) {
+          this.stubToMemberMap.remove(oldStub);
+        }
+
+        if (shutdownInProgress()) {
+          logger.info(LogMarker.DM_VIEWS, LocalizedMessage.create(
+              LocalizedStrings.GroupMembershipService_MEMBERSHIP_SHUNNING_MEMBER__0__DURING_OUR_SHUTDOWN, m));
+          addShunnedMember(m);
+          continue; // no additions processed after shutdown begins
+        } else {
+          boolean wasShunned = endShun(m); // bug #45158 - no longer shun a process that is now in view
+          if (wasShunned && logger.isDebugEnabled()) {
+            logger.debug("No longer shunning {} as it is in the current membership view", m);
+          }
+        }
+        
+        logger.info(LocalizedMessage.create(LocalizedStrings.GroupMembershipService_MEMBERSHIP_PROCESSING_ADDITION__0_, m));
+
+        try {
+          listener.newMemberConnected(m, getStubForMember(m));
+        }
+        catch (VirtualMachineError err) {
+          SystemFailure.initiateFailure(err);
+          // If this ever returns, rethrow the error.  We're poisoned
+          // now, so don't let this thread continue.
+          throw err;
+        }
+        catch (DistributedSystemDisconnectedException e) {
+          // don't log shutdown exceptions
+        }
+        catch (Throwable t) {
+          // Whenever you catch Error or Throwable, you must also
+          // catch VirtualMachineError (see above).  However, there is
+          // _still_ a possibility that you are dealing with a cascading
+          // error condition, so you also need to check to see if the JVM
+          // is still usable:
+          SystemFailure.checkFailure();
+          logger.info(LocalizedMessage.create(
+              LocalizedStrings.GroupMembershipService_MEMBERSHIP_FAULT_WHILE_PROCESSING_VIEW_ADDITION_OF__0, m), t);
+        }
+      } // additions
+      
+      // look for departures
+      for (int i = 0; i < priorView.getMembers().size(); i++) { // departures
+        InternalDistributedMember m = (InternalDistributedMember)priorView.getMembers().get(i);
+        if (newView.contains(m)) {
+          continue; // still alive
+        }
+        
+        if (surpriseMembers.containsKey(m)) {
+          continue; // member has not yet appeared in JGroups view
+        }
+
+        try {
+          logger.info(LogMarker.DM_VIEWS, LocalizedMessage.create(
+              LocalizedStrings.GroupMembershipService_MEMBERSHIP_PROCESSING_DEPARTING_MEMBER__0_, m));
+          removeWithViewLock(m,
+              newView.getCrashedMembers().contains(m) || suspectedMembers.containsKey(m)
+              , "departed JGroups view");
+        }
+        catch (VirtualMachineError err) {
+          SystemFailure.initiateFailure(err);
+          // If this ever returns, rethrow the error.  We're poisoned
+          // now, so don't let this thread continue.
+          throw err;
+        }
+        catch (Throwable t) {
+          // Whenever you catch Error or Throwable, you must also
+          // catch VirtualMachineError (see above).  However, there is
+          // _still_ a possibility that you are dealing with a cascading
+          // error condition, so you also need to check to see if the JVM
+          // is still usable:
+          SystemFailure.checkFailure();
+          logger.info(LocalizedMessage.create(
+              LocalizedStrings.GroupMembershipService_MEMBERSHIP_FAULT_WHILE_PROCESSING_VIEW_REMOVAL_OF__0, m), t);
+        }
+      } // departures
+      
+      // expire surprise members, add others to view
+      long oldestAllowed = System.currentTimeMillis() - this.surpriseMemberTimeout;
+      for (Iterator<Map.Entry<InternalDistributedMember, Long>> it=surpriseMembers.entrySet().iterator(); it.hasNext(); ) {
+        Map.Entry<InternalDistributedMember, Long> entry = it.next();
+        Long birthtime = (Long)entry.getValue();
+        if (birthtime.longValue() < oldestAllowed) {
+          it.remove();
+          InternalDistributedMember m = entry.getKey();
+          logger.info(LocalizedMessage.create(
+            LocalizedStrings.GroupMembershipService_MEMBERSHIP_EXPIRING_MEMBERSHIP_OF_SURPRISE_MEMBER_0, m));
+          removeWithViewLock(m, true, "not seen in membership view in "
+              + this.surpriseMemberTimeout + "ms");
+        }
+        else {
+          if (!latestView.contains(entry.getKey())) {
+            latestView.add(entry.getKey());
+          }
+        }
+      }
+      // expire suspected members
+      oldestAllowed = System.currentTimeMillis() - this.suspectMemberTimeout;
+      for (Iterator it=suspectedMembers.entrySet().iterator(); it.hasNext(); ) {
+        Map.Entry entry = (Map.Entry)it.next();
+        Long birthtime = (Long)entry.getValue();
+        if (birthtime.longValue() < oldestAllowed) {
+          InternalDistributedMember m = (InternalDistributedMember)entry.getKey();
+          it.remove();
+          if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_VIEWS)) {
+            logger.trace(LogMarker.DISTRIBUTION_VIEWS, "Membership: expiring suspect member <{}>", m);
+          }
+        }
+      }
+      try {
+        listener.viewInstalled(latestView);
+        startCleanupTimer();
+      }
+      catch (DistributedSystemDisconnectedException se) {
+      }
+    } // synchronized
+    logger.info(LogMarker.DM_VIEWS, LocalizedMessage.create(
+        LocalizedStrings.GroupMembershipService_MEMBERSHIP_FINISHED_VIEW_PROCESSING_VIEWID___0, Long.valueOf(newViewId)));
+  }
+
+  /** an exception that caused the manager to shut down */
+  volatile Exception shutdownCause;
+
+  /**
+   * the timer used to perform periodic tasks
+   * @guarded.By latestViewLock
+   */
+  private SystemTimer cleanupTimer;
+
+  private Services services;
+
+
+  @Override
+  public boolean isMulticastAllowed() {
+    return !disableMulticastForRollingUpgrade;
+  }
+
+  /**
+   * Joins the distributed system
+   * 
+   * @throws GemFireConfigException - configuration error
+   * @throws SystemConnectException - problem joining
+   */
+  private void join() {
+    this.shutdownCause = null;
+    
+    latestViewLock.writeLock().lock();
+    try {
+      try {
+        this.isJoining = true; // added for bug #44373
+
+        // connect
+        long start = System.currentTimeMillis();
+
+        boolean ok = services.getJoinLeave().join();
+        
+        if (!ok) {
+          throw new GemFireConfigException("Unable to contact a Locator service.  " +
+              "Operation either timed out, was stopped or Locator does not exist.");
+        }
+
+        long delta = System.currentTimeMillis() - start;
+
+        logger.info(LogMarker.DISTRIBUTION, LocalizedMessage.create(
+            LocalizedStrings.GroupMembershipService_CONNECTED_TO_JGROUPS_CHANNEL_TOOK__0__MS, delta));
+
+      } catch (RuntimeException ex) {
+        throw ex;
+      }
+      catch (Exception ex) {
+        if (ex.getCause() != null && ex.getCause().getCause() instanceof SystemConnectException) {
+          throw (SystemConnectException)(ex.getCause().getCause());
+        }
+        throw new DistributionException(LocalizedStrings.GroupMembershipService_AN_EXCEPTION_WAS_THROWN_WHILE_CONNECTING_TO_JGROUPS.toLocalizedString(), ex);
+      }
+      finally {
+        this.isJoining = false;
+      }
+    }
+    finally {
+      latestViewLock.writeLock().unlock();
+    }
+  }
+
+
+  public GMSMembershipManager(DistributedMembershipListener listener) {
+    Assert.assertTrue(listener != null);
+    this.listener = listener;
+  }
+  
+  @Override
+  public void init(Services services) {
+    this.services = services;
+
+    Assert.assertTrue(services != null);
+    
+    this.stats = services.getStatistics();
+    DistributionConfig config = services.getConfig().getDistributionConfig();
+    RemoteTransportConfig transport = services.getConfig().getTransport();
+
+    this.membershipCheckTimeout = config.getSecurityPeerMembershipTimeout();
+    this.wasReconnectingSystem = transport.getIsReconnectingDS();
+    this.oldDSUDPSocket = (DatagramSocket)transport.getOldDSMembershipInfo();
+    
+    if (!config.getDisableTcp()) {
+      dcReceiver = new MyDCReceiver(listener);
+    }
+  }
+  
+  @Override
+  public void start() {
+    DistributionConfig config = services.getConfig().getDistributionConfig();
+    RemoteTransportConfig transport = services.getConfig().getTransport();
+
+    int dcPort = 0;
+    if (!config.getDisableTcp()) {
+      directChannel = new DirectChannel(this, dcReceiver, config, null);
+      dcPort = directChannel.getPort();
+    }
+
+    
+    services.getMessenger().getMemberID().setDirectChannelPort(dcPort);
+
+    MemberAttributes.setDefaults(dcPort,
+        MemberAttributes.DEFAULT.getVmPid(),
+        MemberAttributes.DEFAULT.getVmKind(),
+        MemberAttributes.DEFAULT.getVmViewId(),
+        MemberAttributes.DEFAULT.getName(),
+        MemberAttributes.DEFAULT.getGroups(), MemberAttributes.DEFAULT.getDurableClientAttributes());
+
+    this.vmKind = MemberAttributes.DEFAULT.getVmKind(); // we need this during jchannel startup
+
+    surpriseMemberTimeout = Math.max(20 * DistributionConfig.DEFAULT_MEMBER_TIMEOUT,
+        20 * config.getMemberTimeout());
+    surpriseMemberTimeout = Integer.getInteger("gemfire.surprise-member-timeout", surpriseMemberTimeout).intValue();
+
+  }
+  
+  
+  @Override
+  public void joinDistributedSystem() {
+    try {
+      join();
+    }
+    catch (RuntimeException e) {
+      if (directChannel != null) {
+        directChannel.disconnect(e);
+      }
+      throw e;
+    }
+    
+    this.address = services.getMessenger().getMemberID();
+
+    int dcPort = 0;
+    if (directChannel != null) {
+      dcPort = directChannel.getPort();
+    }
+    
+    MemberAttributes.setDefaults(dcPort,
+        MemberAttributes.DEFAULT.getVmPid(),
+        MemberAttributes.DEFAULT.getVmKind(),
+        address.getVmViewId(),
+        MemberAttributes.DEFAULT.getName(),
+        MemberAttributes.DEFAULT.getGroups(), MemberAttributes.DEFAULT.getDurableClientAttributes());
+    
+    if (directChannel != null) {
+      directChannel.getConduit().setVmViewID(address.getVmViewId());
+    }
+
+    // in order to debug startup issues it we need to announce the membership
+    // ID as soon as we know it
+    logger.info(LocalizedMessage.create(LocalizedStrings.GroupMembershipService_entered_into_membership_in_group_0_with_id_1,
+        new Object[]{address}));
+
+    if (!services.getConfig().getDistributionConfig().getDisableTcp()) {
+      this.conduit = directChannel.getConduit();
+      directChannel.setLocalAddr(address);
+      Stub stub = conduit.getId();
+      memberToStubMap.put(address, stub);
+      stubToMemberMap.put(stub, address);
+    }
+    
+    this.hasConnected = true;
+  }
+  
+  @Override
+  public void started() {
+  }
+  
+
+  /** this is invoked by JGroups when there is a loss of quorum in the membership system */
+  public void quorumLost(Set failures, List remaining) {
+    // notify of quorum loss if split-brain detection is enabled (meaning we'll shut down) or
+    // if the loss is more than one member
+    
+    boolean notify = failures.size() > 1;
+    if (!notify) {
+      notify = services.getConfig().isNetworkPartitionDetectionEnabled();
+    }
+    
+    if (notify) {
+      if (inhibitForceDisconnectLogging) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("<ExpectedException action=add>Possible loss of quorum</ExpectedException>");
+        }
+      }
+      logger.fatal(LocalizedMessage.create(
+          LocalizedStrings.GroupMembershipService_POSSIBLE_LOSS_OF_QUORUM_DETECTED, new Object[] {failures.size(), failures}));
+      if (inhibitForceDisconnectLogging) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("<ExpectedException action=remove>Possible loss of quorum</ExpectedException>");
+        }
+      }
+      
+  
+      try {
+        this.listener.quorumLost(new HashSet<InternalDistributedMember>(failures),
+            remaining);
+      } catch (CancelException e) {
+        // safe to ignore - a forced disconnect probably occurred
+      }
+    }
+  }
+  
+
+  @Override
+  public boolean testMulticast() {
+    return true;
+  }
+  
+  /**
+   * Remove a member, or queue a startup operation to do so
+   * @param dm the member to shun
+   * @param crashed true if crashed
+   * @param reason the reason, esp. if crashed
+   */
+  protected void handleOrDeferRemove(InternalDistributedMember dm,
+      boolean crashed, String reason) {
+    synchronized(startupLock) {
+      if (!processingEvents) {
+        startupMessages.add(new StartupEvent(dm, crashed, reason));
+        return;
+      }
+    }
+    removeMember(dm, crashed, reason);
+  }
+  
+  /**
+   * Remove a member.  {@link #latestViewLock} must be held
+   * before this method is called.  If member is not already shunned,
+   * the uplevel event handler is invoked.
+   * 
+   * @param dm
+   * @param crashed
+   * @param reason
+   */
+  protected void removeWithViewLock(InternalDistributedMember dm,
+      boolean crashed, String reason) {
+    boolean wasShunned = isShunned(dm);
+
+    // Delete resources
+    destroyMember(dm, crashed, reason);
+
+    if (wasShunned) {
+      return; // Explicit deletion, no upcall.
+    }
+    
+    if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_VIEWS)) {
+      logger.trace(LogMarker.DISTRIBUTION_VIEWS, "Membership: dispatching uplevel departure event for < {} >", dm);
+    }
+    
+    try {
+      listener.memberDeparted(dm, crashed, reason);
+    }
+    catch (DistributedSystemDisconnectedException se) {
+      // let's not get huffy about it
+    }
+  }
+  
+  /**
+   * Automatic removal of a member (for internal
+   * use only).  Synchronizes on {@link #latestViewLock} and then deletes
+   * the member.
+   * 
+   * @param dm
+   * @param crashed
+   * @param reason
+   */
+  protected void removeMember(InternalDistributedMember dm,
+      boolean crashed, String reason)
+  {
+    if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_VIEWS)) {
+      StringBuffer sb = new StringBuffer(200);
+      sb.append("Membership: removing <")
+         .append(dm)
+         .append(">; crashed = ")
+         .append(crashed)
+         .append("; reason = ");
+      if (reason != null && (reason.indexOf("NoSuchMemberException") >= 0)) {
+        sb.append("tcp/ip connections closed");
+      }
+      else {
+        sb.append(reason);
+      }
+      logger.trace(LogMarker.DISTRIBUTION_VIEWS, sb);
+    }
+    latestViewLock.writeLock().lock();
+    try {
+      removeWithViewLock(dm, crashed, reason);
+    } finally {
+      latestViewLock.writeLock().unlock();
+    }
+  }
+  
+ 
+  /**
+   * Process a surprise connect event, or place it on the startup queue.
+   * @param member the member
+   * @param stub its stub
+   */
+  protected void handleOrDeferSurpriseConnect(InternalDistributedMember member,
+      Stub stub) {
+    synchronized (startupLock) {
+      if (!processingEvents) {
+        startupMessages.add(new StartupEvent(member, stub));
+        return;
+      }
+    }
+    processSurpriseConnect(member, stub);
+  }
+  
+  public void startupMessageFailed(DistributedMember mbr, String failureMessage) {
+    // fix for bug #40666
+    addShunnedMember((InternalDistributedMember)mbr);
+    // fix for bug #41329, hang waiting for replies
+    try {
+      listener.memberDeparted((InternalDistributedMember)mbr, true, "failed to pass startup checks");
+    }
+    catch (DistributedSystemDisconnectedException se) {
+      // let's not get huffy about it
+    }
+  }
+
+  
+  /**
+   * Logic for handling a direct connection event (message received
+   * from a member not in the JGroups view).  Does not employ the
+   * startup queue.
+   * <p>
+   * Must be called with {@link #latestViewLock} held.  Waits
+   * until there is a stable view.  If the member has already
+   * been added, simply returns; else adds the member.
+   * 
+   * @param dm the member joining
+   * @param stub the member's stub
+   */
+  public boolean addSurpriseMember(DistributedMember dm, 
+      Stub stub) {
+    final InternalDistributedMember member = (InternalDistributedMember)dm;
+    Stub s = null;
+    boolean warn = false;
+    
+    latestViewLock.writeLock().lock();
+    try {
+      // At this point, the join may have been discovered by
+      // other means.
+      if (latestView.contains(member)) {
+        return true;
+      }
+      if (surpriseMembers.containsKey(member)) {
+        return true;
+      }
+      if (latestView.getViewId() > member.getVmViewId()) {
+        // tell the process that it should shut down distribution.
+        // Run in a separate thread so we don't hold the view lock during the request.  Bug #44995
+        new Thread(Thread.currentThread().getThreadGroup(),
+            "Removing shunned GemFire node " + member) {
+          @Override
+          public void run() {
+            // fix for bug #42548
+            // this is an old member that shouldn't be added
+            logger.fatal(LocalizedMessage.create(
+                LocalizedStrings.GroupMembershipService_Invalid_Surprise_Member, new Object[]{member, latestView}));
+            requestMemberRemoval(member, "this member is no longer in the view but is initiating connections");
+          }
+        }.start();
+        addShunnedMember(member);
+        return false;
+      }
+
+      if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_VIEWS)) {
+        logger.trace(LogMarker.DISTRIBUTION_VIEWS, "Membership: Received message from surprise member: <{}>.  My view number is {} it is {}", 
+            member, latestView.getViewId(), member.getVmViewId());
+      }
+
+      // Adding him to this set ensures we won't remove him if a new
+      // JGroups view comes in and he's still not visible.
+      surpriseMembers.put(member, Long.valueOf(System.currentTimeMillis()));
+
+      if (shutdownInProgress()) {
+        if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_VIEWS)) {
+          logger.trace(LogMarker.DISTRIBUTION_VIEWS, "Membership: new member during shutdown ignored: <{}>", member); 
+        }
+
+        // Force disconnect, esp. the TCPConduit
+        String msg = LocalizedStrings.GroupMembershipService_THIS_DISTRIBUTED_SYSTEM_IS_SHUTTING_DOWN.toLocalizedString();
+        if (directChannel != null) {
+          try {
+            directChannel.closeEndpoint(member, msg);
+          } catch (DistributedSystemDisconnectedException e) {
+            // ignore - happens during shutdown
+          }
+        }
+        destroyMember(member, false, msg); // for good luck
+        return true; // allow during shutdown
+      }
+
+      if (isShunned(member)) {
+        warn = true;
+        surpriseMembers.remove(member);
+      } else {
+
+        // Now that we're sure the member is new, add them.
+        if (logger.isTraceEnabled(LogMarker.DM_VIEWS)) {
+          logger.trace(LogMarker.DM_VIEWS, "Membership: Processing surprise addition <{}>", member);
+        }
+
+        // make sure the surprise-member cleanup task is running
+        if (this.cleanupTimer == null) {
+          startCleanupTimer();
+        } // cleanupTimer == null
+
+        // fix for bug #42006, lingering old identity
+        Object oldStub = this.memberToStubMap.remove(member);
+        if (oldStub != null) {
+          this.stubToMemberMap.remove(oldStub);
+        }
+
+        s = stub == null ? getStubForMember(member) : stub;
+        // Make sure that channel information is consistent
+        addChannel(member, s);
+
+        // Ensure that the member is accounted for in the view
+        // Conjure up a new view including the new member. This is necessary
+        // because we are about to tell the listener about a new member, so
+        // the listener should rightfully expect that the member is in our
+        // membership view.
+
+        // However, we put the new member at the end of the list.  This
+        // should ensure he's not chosen as an elder.
+        // This will get corrected when he finally shows up in the JGroups
+        // view.
+        NetView newMembers = new NetView(latestView, latestView.getViewId());
+        newMembers.add(member);
+        latestView = newMembers;
+      }
+    } finally {
+      latestViewLock.writeLock().unlock();
+    }
+    if (warn) { // fix for bug #41538 - deadlock while alerting
+      logger.warn(LocalizedMessage.create(
+          LocalizedStrings.GroupMembershipService_MEMBERSHIP_IGNORING_SURPRISE_CONNECT_FROM_SHUNNED_MEMBER_0, member));
+    } else {
+      listener.newMemberConnected(member, s);
+    }
+    return !warn;
+  }
+  
+
+  /** starts periodic task to perform cleanup chores such as expire surprise members */
+  private void startCleanupTimer() {
+    latestViewLock.readLock().lock();
+    try {
+      if (this.cleanupTimer != null) {
+        return;
+      }
+      DistributedSystem ds = InternalDistributedSystem.getAnyInstance();
+      if (ds != null && ds.isConnected()) {
+        this.cleanupTimer = new SystemTimer(ds, true);
+        SystemTimer.SystemTimerTask st = new SystemTimer.SystemTimerTask() {
+          @Override
+          public void run2() {
+            latestViewLock.writeLock().lock();
+            try {
+              long oldestAllowed = System.currentTimeMillis() - surpriseMemberTimeout;
+              for (Iterator it=surpriseMembers.entrySet().iterator(); it.hasNext(); ) {
+                Map.Entry entry = (Map.Entry)it.next();
+                Long birthtime = (Long)entry.getValue();
+                if (birthtime.longValue() < oldestAllowed) {
+                  it.remove();
+                  InternalDistributedMember m = (InternalDistributedMember)entry.getKey();
+                  logger.info(LocalizedMessage.create(
+                      LocalizedStrings.GroupMembershipService_MEMBERSHIP_EXPIRING_MEMBERSHIP_OF_SURPRISE_MEMBER_0, m));
+                  removeWithViewLock(m, true, "not seen in membership view in "
+                      + surpriseMemberTimeout + "ms");
+                }
+              }
+            } finally {
+              latestViewLock.writeLock().unlock();
+            }
+          }
+        };
+        this.cleanupTimer.scheduleAtFixedRate(st, surpriseMemberTimeout, surpriseMemberTimeout/3);
+      } // ds != null && ds.isConnected()
+    } finally {
+      latestViewLock.readLock().unlock();
+    }
+  }
+  /**
+   * Dispatch the distribution message, or place it on the startup queue.
+   * 
+   * @param msg the message to process
+   */
+  protected void handleOrDeferMessage(DistributionMessage msg) {
+    synchronized(startupLock) {
+      if (!processingEvents) {
+        startupMessages.add(new StartupEvent(msg));
+        return;
+      }
+    }
+    processMessage(msg);
+  }
+  
+  public void warnShun(DistributedMember m) {
+    latestViewLock.writeLock().lock();
+    try {
+      if (!shunnedMembers.containsKey(m)) {
+        return; // not shunned
+      }
+      if (shunnedAndWarnedMembers.contains(m)) {
+        return; // already warned
+      }
+      shunnedAndWarnedMembers.add(m);
+    } finally {
+      latestViewLock.writeLock().unlock();
+    }
+    // issue warning outside of sync since it may cause messaging and we don't
+    // want to hold the view lock while doing that
+    logger.warn(LocalizedMessage.create(LocalizedStrings.GroupMembershipService_MEMBERSHIP_DISREGARDING_SHUNNED_MEMBER_0, m));
+  }
+  
+  /**
+   * Logic for processing a distribution message.  
+   * <p>
+   * It is possible to receive messages not consistent with our view.
+   * We handle this here, and generate an uplevel event if necessary
+   * @param msg the message
+   */
+  public void processMessage(DistributionMessage msg) {
+    boolean isNew = false;
+    InternalDistributedMember m = msg.getSender();
+    boolean shunned = false;
+
+    // First grab the lock: check the sender against our stabilized view.
+    latestViewLock.readLock().lock();
+    try {
+      if (isShunned(m)) {
+        if (msg instanceof StartupMessage) {
+          endShun(m);
+        }
+        else {
+          // fix for bug 41538 - sick alert listener causes deadlock
+          // due to view lock being held during messaging
+          shunned = true;
+        }
+      } // isShunned
+
+      if (!shunned) {
+        isNew = !latestView.contains(m) && !surpriseMembers.containsKey(m);
+
+        // If it's a new sender, wait our turn, generate the event
+        if (isNew) {
+          shunned = !addSurpriseMember(m, getStubForMember(m));
+        } // isNew
+      }
+
+      // Latch the view before we unlock
+    } finally {
+      latestViewLock.readLock().unlock();
+    }
+    
+    if (shunned) { // bug #41538 - shun notification must be outside synchronization to avoid hanging
+      warnShun(m);
+      logger.info("Membership: Ignoring message from shunned member <{}>:{}", m, msg);
+      throw new MemberShunnedException(getStubForMember(m));
+    }
+    
+    listener.messageReceived(msg);
+  }
+
+  /**
+   * Process a new view object, or place on the startup queue
+   * @param viewArg the new view
+   */
+  protected void handleOrDeferViewEvent(NetView viewArg) {
+    if (this.isJoining) {
+      // bug #44373 - queue all view messages while connecting the jgroups channel.
+      // This is done under the latestViewLock, but we can't block here because
+      // we're sitting in the UDP reader thread.
+      synchronized(startupLock) {
+        startupMessages.add(new StartupEvent(viewArg));
+        return;
+      }
+    }
+    latestViewLock.readLock().lock();
+    try {
+      synchronized(startupLock) {
+        if (!processingEvents) {
+          startupMessages.add(new StartupEvent(viewArg));
+          return;
+        }
+      }
+      // view processing can take a while, so we use a separate thread
+      // to avoid blocking the jgroups stack
+      NetView newView = viewArg;
+      long newId = viewArg.getViewId();
+      if (logger.isTraceEnabled(LogMarker.DM_VIEWS)) {
+        logger.trace(LogMarker.DM_VIEWS, "Membership: queuing new view for processing, id = {}, view = {}", 
+            newId, newView);
+      }
+      FakeViewMessage v = new FakeViewMessage(address, newId, newView,
+          GMSMembershipManager.this);
+
+      listener.messageReceived(v);
+    } finally {
+      latestViewLock.readLock().unlock();
+    }
+  }
+  
+  /**
+   * Process a new view object, or place on the startup queue
+   * @param suspectInfo the suspectee and suspector
+   */
+  protected void handleOrDeferSuspect(SuspectMember suspectInfo) {
+    latestViewLock.writeLock().lock();
+    try {
+      synchronized(startupLock) {
+        if (!processingEvents) {
+          return;
+        }
+      }
+      InternalDistributedMember suspect = suspectInfo.suspectedMember;
+      InternalDistributedMember who = suspectInfo.whoSuspected;
+      this.suspectedMembers.put(suspect, Long.valueOf(System.currentTimeMillis()));
+      try {
+        listener.memberSuspect(suspect, who);
+      }
+      catch (DistributedSystemDisconnectedException se) {
+        // let's not get huffy about it
+      }
+    } finally {
+      latestViewLock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Process a potential direct connect.  Does not use
+   * the startup queue.  It grabs the {@link #latestViewLock} 
+   * and then processes the event.
+   * <p>
+   * It is a <em>potential</em> event, because we don't know until we've
+   * grabbed a stable view if this is really a new member.
+   * 
+   * @param member
+   * @param stub
+   */
+  private void processSurpriseConnect(
+      InternalDistributedMember member, 
+      Stub stub) 
+  {
+    latestViewLock.writeLock().lock();
+    try {
+      addSurpriseMember(member, stub);
+    } finally {
+      latestViewLock.writeLock().unlock();
+    }
+  }
+  
+  /**
+   * Dispatch routine for processing a single startup event
+   * @param o the startup event to handle
+   */
+  private void processStartupEvent(StartupEvent o) {
+    // Most common events first
+    
+    if (o.isDistributionMessage()) { // normal message
+      try {
+        processMessage(o.dmsg);
+      }
+      catch (MemberShunnedException e) {
+        // message from non-member - ignore
+      }
+    } 
+    else if (o.isGmsView()) { // view event
+      processView(o.gmsView.getViewId(), o.gmsView);
+    }
+    else if (o.isDepartureEvent()) { // departure
+      removeMember(o.member, o.crashed, o.reason);
+    }
+    else if (o.isConnect()) { // connect
+      processSurpriseConnect(o.member, o.stub);
+    }
+    
+    else // sanity
+      throw new InternalGemFireError(LocalizedStrings.GroupMembershipService_UNKNOWN_STARTUP_EVENT_0.toLocalizedString(o));
+  }
+  
+  /**
+   * Special mutex to create a critical section for
+   * {@link #startEventProcessing()}
+   */
+  private final Object startupMutex = new Object();
+
+  
+  public void startEventProcessing()
+  {
+    // Only allow one thread to perform the work
+    synchronized (startupMutex) {
+      if (logger.isDebugEnabled())
+        logger.debug("Membership: draining startup events.");
+      // Remove the backqueue of messages, but allow
+      // additional messages to be added.
+      for (;;) {
+        StartupEvent ev;
+        // Only grab the mutex while reading the queue.
+        // Other events may arrive while we're attempting to
+        // drain the queue.  This is OK, we'll just keep processing
+        // events here until we've caught up.
+        synchronized (startupLock) {
+          int remaining = startupMessages.size();
+          if (remaining == 0) {
+            // While holding the lock, flip the bit so that
+            // no more events get put into startupMessages, and
+            // notify all waiters to proceed.
+            processingEvents = true;
+            startupLock.notifyAll();
+            break;  // ...and we're done.
+          }
+          if (logger.isDebugEnabled()) {
+            logger.debug("Membership: {} remaining startup message(s)", remaining);
+          }
+          ev = (StartupEvent)startupMessages.removeFirst();
+        } // startupLock
+        try {
+          processStartupEvent(ev);
+        }
+        catch (VirtualMachineError err) {
+          SystemFailure.initiateFailure(err);
+          // If this ever returns, rethrow the error.  We're poisoned
+          // now, so don't let this thread continue.
+          throw err;
+        }
+        catch (Throwable t) {
+          // Whenever you catch Error or Throwable, you must also
+          // catch VirtualMachineError (see above).  However, there is
+          // _still_ a possibility that you are dealing with a cascading
+          // error condition, so you also need to check to see if the JVM
+          // is still usable:
+          SystemFailure.checkFailure();
+          logger.warn(LocalizedMessage.create(LocalizedStrings.GroupMembershipService_MEMBERSHIP_ERROR_HANDLING_STARTUP_EVENT), t);
+        }
+        
+      } // for
+      if (logger.isDebugEnabled())
+        logger.debug("Membership: finished processing startup events.");
+    } // startupMutex
+  }
+
+ 
+  public void waitForEventProcessing() throws InterruptedException {
+    // First check outside of a synchronized block.  Cheaper and sufficient.
+    if (Thread.interrupted()) throw new InterruptedException();
+    if (processingEvents)
+      return;
+    if (logger.isDebugEnabled()) {
+      logger.debug("Membership: waiting until the system is ready for events");
+    }
+    for (;;) {
+      conduit.getCancelCriterion().checkCancelInProgress(null);
+      synchronized (startupLock) {
+        // Now check using a memory fence and synchronization.
+        if (processingEvents)
+          break;
+        boolean interrupted = Thread.interrupted();
+        try {
+          startupLock.wait();
+        }
+        catch (InterruptedException e) {
+          interrupted = true;
+          conduit.getCancelCriterion().checkCancelInProgress(e);
+        }
+        finally {
+          if (interrupted) {
+            Thread.currentThread().interrupt();
+          }
+        }
+      } // synchronized
+    } // for
+    if (logger.isDebugEnabled()) {
+      logger.debug("Membership: continuing");
+    }
+  }
+
+
+  public ReadWriteLock getViewLock() {
+    return this.latestViewLock;
+  }
+
+  /**
+   * Returns a copy (possibly not current) of the current
+   * view (a list of {@link DistributedMember}s)
+   */
+  public NetView getView()
+  {
+    // Grab the latest view under a mutex...
+    NetView v;
+
+    latestViewLock.readLock().lock();
+    v = latestView;
+    latestViewLock.readLock().unlock();
+
+    NetView result = new NetView(v, v.getViewId());
+    
+    for (InternalDistributedMember m: v.getMembers()) {
+      if (isShunned(m)) {
+        result.remove(m);
+      }
+    }
+    
+    return result;
+  }
+  
+  /**
+   * test hook<p>
+   * The lead member is the eldest member with partition detection enabled.<p>
+   * If no members have partition detection enabled, there will be no
+   * lead member and this method will return null.
+   * @return the lead member associated with the latest view
+   */
+  public DistributedMember getLeadMember() {
+    latestViewLock.readLock().lock();
+    try {
+      return latestView == null? null : latestView.getLeadMember();
+    } finally {
+      latestViewLock.readLock().unlock();
+    }
+  }
+  
+  
+  protected boolean isJoining() {
+    return this.isJoining;
+  }
+  
+  /**
+   * test hook
+   * @return the current membership view coordinator
+   */
+  public DistributedMember getCoordinator() {
+    // note - we go straight to JoinLeave because the
+    // DistributionManager queues view changes in a serial executor, where
+    // they're asynchronously installed.  The DS may still see the old coordinator
+    latestViewLock.readLock().lock();
+    try {
+      return latestView == null? null : latestView.getCoordinator();
+    } finally {
+      latestViewLock.readLock().unlock();
+    }
+  }
+
+  public boolean memberExists(InternalDistributedMember m) {
+    latestViewLock.readLock().lock();
+    NetView v = latestView;
+    latestViewLock.readLock().unlock();
+    return v.getMembers().contains(m);
+  }
+  
+  /**
+   * Returns the identity associated with this member. WARNING: this value will
+   * be returned after the channel is closed, but in that case it is good for
+   * logging purposes only. :-)
+   */
+  public InternalDistributedMember getLocalMember()
+  {
+    return address;
+  }
+  
+  public Services getServices() {
+    return services;
+  }
+
+  public void postConnect()
+  {
+    if (channelPause > 0) {
+      logger.info(LocalizedMessage.create(
+          LocalizedStrings.GroupMembershipService_MEMBERSHIP_PAUSING_TO_ALLOW_OTHER_CONCURRENT_PROCESSES_TO_JOIN_THE_DISTRIBUTED_SYSTEM));
+      try {
+        Thread.sleep(channelPause);
+      }
+      catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+      }
+      channelPause = 0;
+    }
+  }
+  
+  /**
+   * @see SystemFailure#loadEmergencyClasses()
+   /**
+   * break any potential circularity in {@link #loadEmergencyClasses()}
+   */
+  private static volatile boolean emergencyClassesLoaded = false;
+
+  /**
+   * inhibits logging of ForcedDisconnectException to keep dunit logs clean
+   * while testing this feature
+   */
+  protected static volatile boolean inhibitForceDisconnectLogging;
+  
+  /**
+   * Ensure that the critical classes from JGroups and the TCP conduit
+   * implementation get loaded.
+   * 
+   * @see SystemFailure#loadEmergencyClasses()
+   */
+  public static void loadEmergencyClasses() {
+    if (emergencyClassesLoaded) return;
+    emergencyClassesLoaded = true;
+    DirectChannel.loadEmergencyClasses();
+    GMSJoinLeave.loadEmergencyClasses();
+    GMSHealthMonitor.loadEmergencyClasses();
+  }
+  /**
+   * Close the receiver, avoiding all potential deadlocks and
+   * eschewing any attempts at being graceful.
+   * 
+   * @see SystemFailure#emergencyClose()
+   */
+  public void emergencyClose() {
+    final boolean DEBUG = SystemFailure.TRACE_CLOSE;
+    
+    setShutdown(); 
+
+    // We can't call close() because they will allocate objects.  Attempt
+    // a surgical strike and touch the important protocols.
+    
+    // MOST important, kill the FD protocols...
+    services.emergencyClose();
+    
+    // Close the TCPConduit sockets...
+    if (directChannel != null) {
+      if (DEBUG) {
+        System.err.println("DEBUG: emergency close of DirectChannel");
+      }
+      directChannel.emergencyClose();
+    }
+    
+    services.getJoinLeave().emergencyClose();
+    services.getMessenger().emergencyClose();
+    services.getHealthMonitor().emergencyClose();
+    services.getAuthenticator().emergencyClose();
+
+    
+    // TODO: could we guarantee not to allocate objects?  We're using Darrel's 
+    // factory, so it's possible that an unsafe implementation could be
+    // introduced here.
+//    stubToMemberMap.clear();
+//    memberToStubMap.clear();
+    
+    this.timer.cancel();
+    
+    if (DEBUG) {
+      System.err.println("DEBUG: done closing GroupMembershipService");
+    }
+  }
+  
+  
+  /**
+   * in order to avoid split-brain occurring when a member is shutting down due to
+   * race conditions in view management we add it as a shutdown member when we receive
+   * a shutdown message.  This is not the same as a SHUNNED member.
+   */
+  public void shutdownMessageReceived(InternalDistributedMember id, String reason) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("Membership: recording shutdown status of {}", id);
+    }
+    synchronized(this.shutdownMembers) { 
+      this.shutdownMembers.put(id, id);
+    }
+  }
+  
+  /**
+   * returns true if a shutdown message has been received from the given address but
+   * that member is still in the membership view or is a surprise member.
+   */
+  public boolean isShuttingDown(InternalDistributedMember mbr) {
+    synchronized(shutdownMembers) {
+      return shutdownMembers.containsKey(mbr);
+    }
+  }
+
+  
+  public void shutdown()
+  {
+    setShutdown(); // for safety
+    services.stop();
+  }
+  
+  @Override
+  public void stop() {
+    
+    // [bruce] Do not null out the channel w/o adding appropriate synchronization
+    
+    logger.debug("MembershipManager closing");
+    
+    if (directChannel != null) {
+      directChannel.disconnect(null);
+
+      if (address != null) {
+        // Make sure that channel information is consistent
+        // Probably not important in this particular case, but just
+        // to be consistent...
+        latestViewLock.writeLock().lock();
+        try {
+          destroyMember(address, false, "orderly shutdown");
+        } finally {
+          latestViewLock.writeLock().unlock();
+        }
+      }
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Membership: channel closed");
+    }
+  }
+  
+  public void uncleanShutdown(String reason, final Exception e) {
+    inhibitForcedDisconnectLogging(false);
+    
+    if (this.directChannel != null) {
+      this.directChannel.disconnect(e);
+    }
+    
+    // first shut down communication so we don't do any more harm to other
+    // members
+    services.emergencyClose();
+    
+    // we have to clear the view before notifying the membership listener,
+    // so that it won't try sending disconnect messages to members that
+    // aren't there.  Otherwise, it sends the disconnect messages to other
+    // members, they ignore the "surprise" connections, and we hang.
+    //GroupMembershipService.this.clearView();
+    if (e != null) {
+      try {
+        if (membershipTestHooks != null) {
+          List l = membershipTestHooks;
+          for (Iterator it=l.iterator(); it.hasNext(); ) {
+            MembershipTestHook dml = (MembershipTestHook)it.next();
+            dml.beforeMembershipFailure(reason, e);
+          }
+        }
+        listener.membershipFailure(reason, e);
+        if (membershipTestHooks != null) {
+          List l = membershipTestHooks;
+          for (Iterator it=l.iterator(); it.hasNext(); ) {
+            MembershipTestHook dml = (MembershipTestHook)it.next();
+            dml.afterMembershipFailure(reason, e);
+          }
+        }
+      }
+      catch (RuntimeException re) {
+        logger.warn(LocalizedMessage.create(LocalizedStrings.GroupMembershipService_EXCEPTION_CAUGHT_WHILE_SHUTTING_DOWN), re);
+      }
+    }
+  }
+  
+  /** generate XML for the cache before shutting down due to forced disconnect */
+  public void saveCacheXmlForReconnect() {
+    // there are two versions of this method so it can be unit-tested
+    boolean sharedConfigEnabled = services.getConfig().getDistributionConfig().getUseSharedConfiguration();
+    saveCacheXmlForReconnect(sharedConfigEnabled);
+  }
+  
+  /** generate XML from the cache before shutting down due to forced disconnect */
+  public void saveCacheXmlForReconnect(boolean sharedConfigEnabled) {
+    // first save the current cache description so reconnect can rebuild the cache
+    GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+    if (cache != null && (cache instanceof Cache)) {
+      if (!Boolean.getBoolean("gemfire.autoReconnect-useCacheXMLFile")
+          && !cache.isSqlfSystem() && !sharedConfigEnabled) {
+        try {
+          logger.info("generating XML to rebuild the cache after reconnect completes");
+          StringPrintWriter pw = new StringPrintWriter(); 
+          CacheXmlGenerator.generate((Cache)cache, pw, true, false);
+          String cacheXML = pw.toString();
+          cache.getCacheConfig().setCacheXMLDescription(cacheXML);
+          logger.info("XML generation completed: {}", cacheXML);
+        } catch (CancelException e) {
+          logger.info(LocalizedMessage.create(LocalizedStrings.GroupMembershipService_PROBLEM_GENERATING_CACHE_XML), e);
+        }
+      } else if (sharedConfigEnabled && !cache.getCacheServers().isEmpty()) {
+        // we need to retain a cache-server description if this JVM was started by gfsh
+        List<BridgeServerCreation> list = new ArrayList<BridgeServerCreation>(cache.getCacheServers().size());
+        for (Iterator it = cache.getCacheServers().iterator(); it.hasNext(); ) {
+          CacheServer cs = (CacheServer)it.next();
+          BridgeServerCreation bsc = new BridgeServerCreation(cache, cs);
+          list.add(bsc);
+        }
+        cache.getCacheConfig().setCacheServerCreation(list);
+        logger.info("CacheServer configuration saved");
+      }
+    }
+  }
+
+  public boolean requestMemberRemoval(DistributedMember mbr, String reason) {
+    if (mbr.equals(this.address)) {
+      return false;
+    }
+    logger.fatal(LocalizedMessage.create(
+        LocalizedStrings.GroupMembershipService_MEMBERSHIP_REQUESTING_REMOVAL_OF_0_REASON_1,
+        new Object[] {mbr, reason}));
+    try {
+      services.getJoinLeave().remove((InternalDistributedMember)mbr, reason);
+    }
+    catch (RuntimeException e) {
+      Throwable problem = e;
+      if (this.shutdownCause != null) {
+        Throwable cause = this.shutdownCause;
+        // If ForcedDisconnectException occurred then report it as actual
+        // problem.
+        if (cause instanceof ForcedDisconnectException) {
+          problem = (Exception) cause;
+        } else {
+          Throwable ne = problem;
+          while (ne.getCause() != null) {
+            ne = ne.getCause();
+          }
+          try {
+            ne.initCause(this.shutdownCause);
+          }
+          catch (IllegalArgumentException selfCausation) {
+            // fix for bug 38895 - the cause is already in place
+          }
+        }
+      }
+      if (!services.getConfig().getDistributionConfig().getDisableAutoReconnect()) {
+        saveCacheXmlForReconnect();
+      }
+      listener.membershipFailure("Channel closed", problem);
+      throw new DistributedSystemDisconnectedException("Channel closed", problem);
+    }
+    return true;
+  }
+  
+  public void suspectMembers(Set members, String reason) {
+    for (Iterator it=members.iterator(); it.hasNext(); ) {
+      suspectMember((DistributedMember)it.next(), reason);
+    }
+  }
+  
+  public void suspectMember(DistributedMember mbr, String reason) {
+    if (mbr != null) {
+      this.services.getHealthMonitor().suspect((InternalDistributedMember)mbr, reason);
+    }
+  }
+
+  /* like memberExists() this checks to see if the given ID is in the current
+   * membership view.  If it is in the view though we try to connect to it
+   * port to see if it's still around.  If we can't then
+   * suspect processing is initiated on the member with the given reason string.
+   * @param mbr the member to verify
+   * @param reason why the check is being done (must not be blank/null)
+   * @return true if the member checks out
+   */
+  public boolean verifyMember(DistributedMember mbr, String reason) {
+    if (mbr != null && memberExists((InternalDistributedMember)mbr)) {
+      this.services.getHealthMonitor().checkSuspect(mbr, reason);
+    }
+    return false;
+  }
+
+  /**
+   * Perform the grossness associated with sending a message over
+   * a DirectChannel
+   * 
+   * @param destinations the list of destinations
+   * @param content the message
+   * @param theStats the statistics object to update
+   * @return all recipients who did not receive the message (null if
+   * all received it)
+   * @throws NotSerializableException if the message is not serializable
+   */
+  private Set directChannelSend(InternalDistributedMember[] destinations,
+      DistributionMessage content,
+      com.gemstone.gemfire.distributed.internal.DistributionStats theStats)
+      throws NotSerializableException
+  {
+    boolean allDestinations;
+    InternalDistributedMember[] keys;
+    if (content.forAll()) {
+      allDestinations = true;
+      latestViewLock.readLock().lock();
+      Set keySet = memberToStubMap.keySet();
+      keys = new InternalDistributedMember[keySet.size()];
+      keys = (InternalDistributedMember[])keySet.toArray(keys);
+      latestViewLock.readLock().unlock();
+    }
+    else {
+      allDestinations = false;
+      keys = destinations;
+    }
+
+    int sentBytes = 0;
+    try {
+      sentBytes = directChannel.send(this, keys, content,
+          this.services.getConfig().getDistributionConfig().getAckWaitThreshold(),
+          this.services.getConfig().getDistributionConfig().getAckSevereAlertThreshold());
+                                     
+      if (theStats != null)
+        theStats.incSentBytes(sentBytes);
+    }
+    catch (DistributedSystemDisconnectedException ex) {
+      if (this.shutdownCause != null) {
+        throw new DistributedSystemDisconnectedException("DistributedSystem is shutting down", this.shutdownCause);
+      } else {
+        throw ex; // see bug 41416
+      }
+    }
+    catch (ConnectExceptions ex) {
+      if (allDestinations)
+        return null;
+      
+      List members = ex.getMembers(); // We need to return this list of failures
+      
+      // SANITY CHECK:  If we fail to send a message to an existing member 
+      // of the view, we have a serious error (bug36202).
+      NetView view = services.getJoinLeave().getView(); // grab a recent view, excluding shunned members
+      
+      // Iterate through members and causes in tandem :-(
+      Iterator it_mem = members.iterator();
+      Iterator it_causes = ex.getCauses().iterator();
+      while (it_mem.hasNext()) {
+        InternalDistributedMember member = (InternalDistributedMember)it_mem.next();
+        Throwable th = (Throwable)it_causes.next();
+        
+        if (!view.contains(member)) {
+          continue;
+        }
+        logger.fatal(LocalizedMessage.create(
+            LocalizedStrings.GroupMembershipService_FAILED_TO_SEND_MESSAGE_0_TO_MEMBER_1_VIEW_2,
+            new Object[] {content, member, view}), th);
+//        Assert.assertTrue(false, "messaging contract failure");
+      }
+      return new HashSet(members);
+    } // catch ConnectionExceptions
+    catch (ToDataException e) {
+      throw e; // error in user data
+    }
+    catch (CancelException e) {
+      // not interesting, we're just shutting down
+      throw e;
+    }
+    catch (IOException e) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Membership: directChannelSend caught exception: {}", e.getMessage(), e);
+      }
+      if (e instanceof NotSerializableException) {
+        throw (NotSerializableException)e;
+      }
+    }
+    catch (RuntimeException e) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Membership: directChannelSend caught exception: {}", e.getMessage(), e);
+      }
+      throw e;
+    }
+    catch (Error e) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Membership: directChannelSend caught exception: {}", e.getMessage(), e);
+      }
+      throw e;
+    }
+    return null;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see com.gemstone.gemfire.distributed.internal.membership.MembershipManager#isConnected()
+   */
+  public boolean isConnected() {
+    return (this.hasConnected && !this.shutdownInProgress); 
+  }
+  
+  /**
+   * Returns true if the distributed system is in the process of auto-reconnecting.
+   * Otherwise returns false.
+   */
+  public boolean isReconnectingDS() {
+    if (this.hasConnected) {
+      // if the jgroups channel has been connected then we aren't in the
+      // middle of a reconnect attempt in this instance of the distributed system
+      return false;
+    } else {
+      return this.wasReconnectingSystem;
+    }
+  }
+  
+  /**
+   * A quorum checker is used during reconnect to perform quorum
+   * probes.  It is made available here for the UDP protocol to
+   * hand off ping-pong responses to the checker.
+   */
+  public JGroupsQuorumChecker getQuorumCheckerImpl() {
+    return this.quorumChecker;
+  }
+  
+  /**
+   * During jgroups connect the UDP protocol will invoke
+   * this method to find the DatagramSocket it should use instead of
+   * creating a new one.
+   */
+  public DatagramSocket getMembershipSocketForUDP() {
+    return this.oldDSUDPSocket;
+  }
+  
+  @Override
+  public QuorumChecker getQuorumChecker() {
+    if ( ! (this.shutdownCause instanceof ForcedDisconnectException) ) {
+      return null;
+    }
+    if (this.quorumChecker != null) {
+      return this.quorumChecker;
+    }
+    try {
+      // TODO: we really need JChannel instead of a datagram socket because jgroup
+      // doesn't have the "ping" handling that I built into the TP protocol.  Maybe we should just
+      // keep the JGroupsMessenger and use it to send PingMessages.  We'd want to
+      // bypass UNICAST and wipe out all message handlers except for the Pings.
+      DatagramSocket sock = new DatagramSocket(this.address.getPort(),
+                               this.address.getNetMember().getInetAddress());
+      JGroupsQuorumChecker impl = new JGroupsQuorumChecker(
+          services.getJoinLeave().getView(), services.getConfig().getLossThreshold(),
+          sock);
+      impl.initialize();
+      this.quorumChecker = impl;
+      return impl;
+    } catch (SocketException e) {
+      logger.warn("unable to create quorum checker", e);
+      return null;
+    }
+  }
+  
+  @Override
+  public void releaseQuorumChecker(QuorumChecker checker) {
+    ((JGroupsQuorumChecker)checker).teardown();
+    InternalDistributedSystem system = InternalDistributedSystem.getAnyInstance();
+    if (system == null || !system.isConnected()) {
+      DatagramSocket sock = (DatagramSocket)checker.getMembershipInfo();
+      if (sock != null  &&  !sock.isClosed()) {
+        sock.close();
+      }
+    }
+  }
+  
+  public Set send(InternalDistributedMember dest, DistributionMessage msg)
+    throws NotSerializableException {
+    
+    InternalDistributedMember dests[] = new InternalDistributedMember[] { dest };
+    return send (dests, msg, null);
+  }
+  
+  public Set send(InternalDistributedMember[] destinations,
+      DistributionMessage msg,
+      com.gemstone.gemfire.distributed.internal.DistributionStats theStats)
+      throws NotSerializableException
+  {
+    Set result = null;
+    boolean allDestinations = msg.forAll();
+    
+    if (!this.hasConnected) {
+      if (services.getCancelCriterion().cancelInProgress() != null) {
+        throw services.getCancelCriterion().generateCancelledException(null);
+      }
+      // If we get here, we are starting up, so just report a failure.
+      if (allDestinations)
+        return null;
+      else {
+        result = new HashSet();
+        for (int i = 0; i < destinations.length; i ++)
+          result.add(destinations[i]);
+        return result;
+      }
+    }
+
+    // Handle trivial cases
+    if (destinations == null) {
+      if (logger.isTraceEnabled())
+        logger.trace("Membership: Message send: returning early because null set passed in: '{}'", msg);
+      return null; // trivially: all recipients received the message
+    }
+    if (destinations.length == 0) {
+      if (logger.isTraceEnabled())
+        logger.trace("Membership: Message send: returning early because empty destination list passed in: '{}'", msg);
+      return null; // trivially: all recipients received the message
+    }
+
+    msg.setSender(address);
+    
+    msg.setBreadcrumbsInSender();
+    Breadcrumbs.setProblem(null);
+    
+    result = directChannelSend(destinations, msg, theStats);
+    // If the message was a broadcast, don't enumerate failures.
+    if (allDestinations)
+      return null;
+    else {
+      return result;
+    }
+  }
+  
+  /**
+   * @throws ConnectionException if the conduit has stopped
+   */
+  public void reset() throws DistributionException
+  {
+    if (conduit != null) {
+      try {
+        conduit.restart();
+      } catch (ConnectionException e) {
+        throw new DistributionException(LocalizedStrings.GroupMembershipService_UNABLE_TO_RESTART_CONDUIT.toLocalizedString(), e);
+      }
+    }
+  }
+
+  // MembershipManager method
+  @Override
+  public void forceUDPMessagingForCurrentThread() {
+    // not currently supported by this manager
+  }
+  
+  // MembershipManager method
+  @Override
+  public void releaseUDPMessagingForCurrentThread() {
+    // not currently supported by this manager
+  }
+  
+  /**
+   * Get or create stub for given member
+   */
+  public Stub getStubForMember(InternalDistributedMember m)
+  {
+    if (shutdownInProgress) {
+      throw new DistributedSystemDisconnectedException(LocalizedStrings.GroupMembershipService_DISTRIBUTEDSYSTEM_IS_SHUTTING_DOWN.toLocalizedString(), this.shutdownCause);
+    }
+    // Bogus stub object if direct channels not being used
+    if (conduit == null)
+      return new Stub(m.getInetAddress(), m.getPort(), m.getVmViewId());
+    
+    // Return existing one if it is already in place
+    Stub result;
+    result = (Stub)memberToStubMap.get(m);
+    if (result != null)
+      return result;
+
+    latestViewLock.readLock().lock();
+    try {
+      // Do all of this work in a critical region to prevent
+      // members from slipping in during shutdown
+      if (shutdownInProgress())
+        return null; // don't try to create a stub during shutdown
+      if (isShunned(m))
+        return null; // don't let zombies come back to life
+      
+      // OK, create one.  Update the table to reflect the creation.
+      result = directChannel.createConduitStub(m);
+      addChannel(m, result);
+    } finally {
+      latestViewLock.readLock().unlock();
+    }
+   return result;
+  }
+
+  public InternalDistributedMember getMemberForStub(Stub s, boolean validated)
+  {
+    latestViewLock.readLock().lock();
+    try {
+      if (shutdownInProgress) {
+        throw new DistributedSystemDisconnectedException(LocalizedStrings.GroupMembershipService_DISTRIBUTEDSYSTEM_IS_SHUTTING_DOWN.toLocalizedString(), this.shutdownCause);
+      }
+      InternalDistributedMember result = (InternalDistributedMember)
+          stubToMemberMap.get(s);
+      if (result != null) {
+        if (validated && !this.latestView.contains(result)) {
+          // Do not return this member unless it is in the current view.
+          if (!surpriseMembers.containsKey(result)) {
+            // if not a surprise member, this stub is lingering and should be removed
+            stubToMemberMap.remove(s);
+            memberToStubMap.remove(result);
+          }
+          result = null;
+          // fall through to see if there is a newer member using the same direct port
+        }
+      }
+      if (result == null) {
+        // it may have not been added to the stub->idm map yet, so check the current view
+        for (InternalDistributedMember idm: latestView.getMembers()) {
+          if (idm.getInetAddress().equals(s.getInetAddress())
+              && idm.getDirectChannelPort() == s.getPort()) {
+            addChannel(idm, s);
+            return idm;
+          }
+        }
+      }
+      return result;
+    } finally {
+      latestViewLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * return the member address for the given GMS address, or null if it's not a member
+   */
+  public InternalDistributedMember getMemberForAddress(InetSocketAddress addr) {
+    latestViewLock.readLock().lock();
+    try {
+      for (InternalDistributedMember idm: latestView.getMembers()) {
+        if (idm.getInetAddress().equals(addr.getAddress())
+              && idm.getPort() == addr.getPort()) {
+            return idm;
+        }
+      }
+      return null;
+    } finally {
+      latestViewLock.readLock().unlock();
+    }
+  }
+  
+  public void setShutdown()
+  {
+    shutdownInProgress = true;
+  }
+
+  @Override
+  public boolean shutdownInProgress() {
+    // Impossible condition (bug36329): make sure that we check DM's
+    // view of shutdown here
+    return shutdownInProgress || listener.getDM().shutdownInProgress();
+  }
+  
+  /**
+   * Add a mapping from the given member to the given stub. Must be
+   * synchronized on {@link #latestViewLock} by caller.
+   * 
+   * @param member
+   * @param theChannel
+   */
+  protected void addChannel(InternalDistributedMember member, Stub theChannel)
+  {
+    if (theChannel != null) {
+      // Don't overwrite existing stub information with a null
+      this.memberToStubMap.put(member, theChannel);
+
+      // Can't create reverse mapping if the stub is null
+      this.stubToMemberMap.put(theChannel, member);
+    }
+  }
+
+
+  /**
+   * Clean up and create consistent new view with member removed.
+   * No uplevel events are generated.
+   * 
+   * Must be called with the {@link #latestViewLock} held.
+   */
+  protected void destroyMember(final InternalDistributedMember member,
+      boolean crashed, final String reason) {
+    if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_VIEWS))
+      logger.trace(LogMarker.DISTRIBUTION_VIEWS, "Membership: destroying < {} >", member);
+    
+    // Clean up the maps
+    Stub theChannel = (Stub)memberToStubMap.remove(member);
+    if (theChannel != null) {
+      this.stubToMemberMap.remove(theChannel);
+    }
+    
+    // Make sure it is removed from the view
+    latestViewLock.writeLock().lock();
+    try {
+      if (latestView.contains(member)) {
+        NetView newView = new NetView(latestView, latestView.getViewId());
+        newView.remove(member);
+        latestView = newView;
+      }
+    } finally {
+      latestViewLock.writeLock().unlock();
+    }
+    
+    surpriseMembers.remove(member);
+    
+    // Trickiness: there is a minor recursion
+    // with addShunnedMembers, since it will
+    // attempt to destroy really really old members.  Performing the check
+    // here breaks the recursion.
+    if (!isShunned(member)) {
+      addShunnedMember(member);
+    }
+
+    final DirectChannel dc = directChannel;
+    if (dc != null) {
+//      if (crashed) {
+//        dc.closeEndpoint(member, reason);
+//      }
+//      else
+      // Bug 37944: make sure this is always done in a separate thread,
+      // so that shutdown conditions don't wedge the view lock
+      { // fix for bug 34010
+        Thread t = new Thread() {
+          @Override
+          public void run() {
+            try {
+              Thread.sleep(
+                  Integer.getInteger("p2p.disconnectDelay", 3000).intValue());
+            }
+            catch (InterruptedException ie) {
+              Thread.currentThread().interrupt();
+              // Keep going, try to close the endpoint.
+            }
+            if (logger.isDebugEnabled())
+              logger.debug("Membership: closing connections for departed member {}", member);
+            // close connections, but don't do membership notification since it's already been done
+            dc.closeEndpoint(member, reason, false); 
+          }
+        };
+        t.setDaemon(true);
+        t.setName("disconnect thread for " + member);
+        t.start();
+      } // fix for bug 34010
+    }
+  }
+  
+  public Stub getDirectChannel()
+  {
+    Stub result;
+    latestViewLock.readLock().lock();
+    result = (Stub)memberToStubMap.get(address);
+    latestViewLock.readLock().unlock();
+    return result;
+  }
+
+  /**
+   * Indicate whether the given member is in the zombie list (dead or dying)
+   * @param m the member in question
+   * 
+   * This also checks the time the given member was shunned, and
+   * has the side effect of removing the member from the
+   * list if it was shunned too far in the past.
+   * 
+   * @guarded.By latestViewLock
+   * @return true if the given member is a zombie
+   */
+  public boolean isShunned(DistributedMember m) {
+    latestViewLock.readLock().lock();
+    try {
+      if (!shunnedMembers.containsKey(m))
+        return false;
+      
+      // Make sure that the entry isn't stale...
+      long shunTime = ((Long)shunnedMembers.get(m)).longValue();
+      long now = System.currentTimeMillis();
+      if (shunTime + SHUNNED_SUNSET * 1000 > now)
+        return true;
+      
+      // Oh, it _is_ stale.  Remove it while we're here.
+      if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_VIEWS)) {
+        logger.debug("Membership: no longer shunning < 

<TRUNCATED>

Mime
View raw message