geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From upthewatersp...@apache.org
Subject [2/2] incubator-geode git commit: Revert "Removing TCPConduit's Stub ID class"
Date Sat, 12 Dec 2015 01:27:40 GMT
Revert "Removing TCPConduit's Stub ID class"

This reverts commit 5b35e43f93bfbf6d62eadf7979eb3a8b7f59b77e.

This commit was causing compilation failures.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/507f2f3a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/507f2f3a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/507f2f3a

Branch: refs/heads/develop
Commit: 507f2f3a905e70fcabed9b83d4dc966ef3e9e6ec
Parents: f7670e1
Author: Dan Smith <upthewaterspout@apache.org>
Authored: Fri Dec 11 16:49:38 2015 -0800
Committer: Dan Smith <upthewaterspout@apache.org>
Committed: Fri Dec 11 17:25:39 2015 -0800

----------------------------------------------------------------------
 .../internal/DistributionManager.java           |   8 +
 .../distributed/internal/StartupMessage.java    |   1 +
 .../internal/direct/DirectChannel.java          |  93 ++++++-
 .../internal/direct/MissingStubException.java   |  37 +++
 .../internal/direct/ShunnedMemberException.java |  34 ---
 .../internal/membership/MembershipManager.java  |  29 +-
 .../gms/mgr/GMSMembershipManager.java           | 197 +++++++++++--
 .../internal/i18n/ParentLocalizedStrings.java   |   6 +-
 .../gemfire/internal/tcp/Connection.java        | 117 ++++----
 .../gemfire/internal/tcp/ConnectionTable.java   |  91 +++---
 .../internal/tcp/MemberShunnedException.java    |   7 +-
 .../gemfire/internal/tcp/ServerDelegate.java    |   5 +-
 .../com/gemstone/gemfire/internal/tcp/Stub.java | 164 +++++++++++
 .../gemfire/internal/tcp/TCPConduit.java        | 274 ++++++++++++++++---
 .../internal/DistributionManagerDUnitTest.java  |   6 +-
 .../gms/mgr/GMSMembershipManagerJUnitTest.java  |  31 ++-
 .../internal/tcp/ConnectionJUnitTest.java       |   3 +-
 17 files changed, 870 insertions(+), 233 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java
index e3c342a..964845c 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java
@@ -91,6 +91,7 @@ import com.gemstone.gemfire.internal.sequencelog.MembershipLogger;
 import com.gemstone.gemfire.internal.tcp.Connection;
 import com.gemstone.gemfire.internal.tcp.ConnectionTable;
 import com.gemstone.gemfire.internal.tcp.ReenteredConnectException;
+import com.gemstone.gemfire.internal.tcp.Stub;
 import com.gemstone.gemfire.internal.util.concurrent.StoppableReentrantLock;
 
 /**
@@ -2714,6 +2715,13 @@ public class DistributionManager
       return false; // no peers, we are alone.
     }
 
+    // ensure we have stubs for everyone else
+    Iterator it = allOthers.iterator();
+    while (it.hasNext()) {
+      InternalDistributedMember member = (InternalDistributedMember)it.next();
+      membershipManager.getStubForMember(member);
+    }
+
     try {
       ok = op.sendStartupMessage(allOthers, STARTUP_TIMEOUT, equivs,
           redundancyZone, enforceUniqueZone());

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java
index 01f8c62..96f8b60 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java
@@ -37,6 +37,7 @@ import com.gemstone.gemfire.internal.InternalDataSerializer.SerializerAttributes
 import com.gemstone.gemfire.internal.InternalInstantiator.InstantiatorAttributesHolder;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.tcp.Stub;
 
 /**
  * A message that is sent to all other distribution manager when

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java
index d4df3bf..14ff923 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java
@@ -38,7 +38,6 @@ import com.gemstone.gemfire.InternalGemFireException;
 import com.gemstone.gemfire.SystemFailure;
 import com.gemstone.gemfire.ToDataException;
 import com.gemstone.gemfire.cache.TimeoutException;
-import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
 import com.gemstone.gemfire.distributed.internal.DM;
 import com.gemstone.gemfire.distributed.internal.DMStats;
@@ -65,6 +64,7 @@ import com.gemstone.gemfire.internal.tcp.Connection;
 import com.gemstone.gemfire.internal.tcp.ConnectionException;
 import com.gemstone.gemfire.internal.tcp.MemberShunnedException;
 import com.gemstone.gemfire.internal.tcp.MsgStreamer;
+import com.gemstone.gemfire.internal.tcp.Stub;
 import com.gemstone.gemfire.internal.tcp.TCPConduit;
 import com.gemstone.gemfire.internal.util.Breadcrumbs;
 import com.gemstone.gemfire.internal.util.concurrent.ReentrantSemaphore;
@@ -115,6 +115,13 @@ public class DirectChannel {
     }
     
     /**
+     * Returns the endpoint ID for the direct channel
+     */
+    public Stub getLocalStub() {
+      return conduit.getId();
+    }
+    
+    /**
      * when the initial number of members is known, this method is invoked
      * to ensure that connections to those members can be established in a
      * reasonable amount of time.  See bug 39848 
@@ -174,7 +181,7 @@ public class DirectChannel {
         this.groupOrderedSenderSem = new ReentrantSemaphore(MAX_GROUP_SENDERS);
         this.groupUnorderedSenderSem = new ReentrantSemaphore(MAX_GROUP_SENDERS);
         logger.info(LocalizedMessage.create(
-            LocalizedStrings.DirectChannel_GEMFIRE_P2P_LISTENER_STARTED_ON__0, conduit.getLocalAddr()));
+            LocalizedStrings.DirectChannel_GEMFIRE_P2P_LISTENER_STARTED_ON__0, conduit.getId()));
 
       }
       catch (ConnectionException ce) {
@@ -185,6 +192,48 @@ public class DirectChannel {
     }
 
  
+//   /**
+//    * 
+//    * @param addr destination for the message
+//    * @param stubMap map containing all the stubs
+//    * @param msg the original message
+//    * @param msgBuf the serialized message
+//    * @param directAck true if we need an ack
+//    * @param processorType the type (serialized, etc.)
+//    * @return if directAck, the Connection that needs the acknowledgment
+//    * @throws MissingStubException if we do not have a Stub for the recipient
+//    * @throws IOException if the message could not be sent
+//    */
+//   private Connection attemptSingleSend(MembershipManager mgr,
+//       InternalDistributedMember addr,
+//       DistributionMessage msg, ByteBuffer msgBuf,
+//       boolean directAck, int processorType)
+//       throws MissingStubException, IOException
+//   {
+//     if (!msg.deliverToSender() && localAddr.equals(addr))
+//       return null;
+
+//     if (addr == null)
+//       return null;
+//     Stub dest = mgr.getStubForMember(addr);
+//     if (dest == null) {
+//       // This should only happen if the member is no longer in the view.
+//       Assert.assertTrue(!mgr.memberExists(addr));
+//       throw new MissingStubException("No stub");
+//     }
+//     try {
+//       msgBuf.position(0); // fix for bug#30680
+//       Connection con = conduit.sendSync(dest, msgBuf, processorType, msg);
+//       if (directAck)
+//         return con;
+//       else
+//         return null;
+//     }
+//     catch(IOException t) {
+//       throw t;
+//       }
+//   }
+
   /**
    * Return how many concurrent operations should be allowed by default.
    * since 6.6, this has been raised to Integer.MAX value from the number
@@ -590,13 +639,22 @@ public class DirectChannel {
         continue;
       }
 
-      if (!mgr.memberExists(destination) || mgr.shutdownInProgress() || mgr.isShunned(destination)) {
+      Stub stub = mgr.getStubForMember(destination);
+      if (stub == null) {
         // This should only happen if the member is no longer in the view.
         if (logger.isTraceEnabled(LogMarker.DM)) {
-          logger.trace(LogMarker.DM, "Not a member: {}", destination);
+          logger.trace(LogMarker.DM, "No Stub for {}", destination);
         }
+        // The only time getStubForMember returns null is if we are
+        // shunning that member or we are shutting down.
+        // So the following assertion is wrong:
+        //Assert.assertTrue(!mgr.memberExists(destination));
+        // instead we should:
+        // Assert.assertTrue(mgr.shutdownInProgress() || mgr.isShunned(destination));
+        //but this is not worth doing and isShunned is not public.
+        // SO the assert has been deadcoded.
         if (ce == null) ce = new ConnectExceptions();
-        ce.addFailure(destination, new ShunnedMemberException(LocalizedStrings.DirectChannel_SHUNNING_0.toLocalizedString()));
+        ce.addFailure(destination, new MissingStubException(LocalizedStrings.DirectChannel_NO_STUB_0.toLocalizedString()));
       }
       else {
         try {
@@ -604,8 +662,8 @@ public class DirectChannel {
           if (ackTimeout > 0) {
             startTime = System.currentTimeMillis();
           }
-          Connection con = conduit.getConnection(destination, preserveOrder,
-              retry, startTime, ackTimeout, ackSDTimeout);
+          Connection con = conduit.getConnection(destination, stub,
+              preserveOrder, retry, startTime, ackTimeout, ackSDTimeout);
           
           con.setInUse(true, startTime, 0, 0, null); // fix for bug#37657
           cons.add(con);
@@ -765,7 +823,7 @@ public class DirectChannel {
   }
 
   
-  public void receive(DistributionMessage msg, int bytesRead) {
+  public void receive(DistributionMessage msg, int bytesRead, Stub connId) {
     if (disconnected) {
       return;
     }
@@ -786,6 +844,10 @@ public class DirectChannel {
     }
   }
 
+//  public void newMemberConnected(InternalDistributedMember member, Stub id) {
+//    receiver.newMemberConnected(member, id);
+//  }
+
   public InternalDistributedMember getLocalAddress() {
     return this.localAddr;
   }
@@ -868,6 +930,13 @@ public class DirectChannel {
     }
   }
   
+  /** Create a TCPConduit stub from a JGroups InternalDistributedMember */
+  public Stub createConduitStub(InternalDistributedMember addr) {
+    int port = addr.getDirectChannelPort();
+    Stub stub = new Stub(addr.getInetAddress(), port, addr.getVmViewId());
+    return stub;
+  }
+  
   public void closeEndpoint(InternalDistributedMember member, String reason) {
     closeEndpoint(member, reason, true);
   }
@@ -879,7 +948,7 @@ public class DirectChannel {
   public void closeEndpoint(InternalDistributedMember member, String reason, boolean notifyDisconnect) {
     TCPConduit tc = this.conduit;
     if (tc != null) {
-      tc.removeEndpoint(member, reason, notifyDisconnect);
+      tc.removeEndpoint(createConduitStub(member), reason, notifyDisconnect);
     }
   }
 
@@ -893,7 +962,7 @@ public class DirectChannel {
    *    the map to add the state to
    * @since 5.1
    */
-  public void getChannelStates(DistributedMember member, Map result)
+  public void getChannelStates(Stub member, Map result)
   {
     TCPConduit tc = this.conduit;
     if (tc != null) {
@@ -905,7 +974,7 @@ public class DirectChannel {
    * wait for the given connections to process the number of messages
    * associated with the connection in the given map
    */
-  public void waitForChannelState(DistributedMember member, Map channelState)
+  public void waitForChannelState(Stub member, Map channelState)
     throws InterruptedException
   {
     if (Thread.interrupted()) throw new InterruptedException();
@@ -918,7 +987,7 @@ public class DirectChannel {
   /**
    * returns true if there are still receiver threads for the given member
    */
-  public boolean hasReceiversFor(DistributedMember mbr) {
+  public boolean hasReceiversFor(Stub mbr) {
     return this.conduit.hasReceiversFor(mbr);
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/MissingStubException.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/MissingStubException.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/MissingStubException.java
new file mode 100644
index 0000000..49b4486
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/MissingStubException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.distributed.internal.direct;
+
+import com.gemstone.gemfire.GemFireCheckedException;
+
+/**
+ * Exception thrown when the TCPConduit is unable to acquire a stub
+ * for the given recipient.
+ * 
+ * @author jpenney
+ *
+ */
+public class MissingStubException extends GemFireCheckedException
+{
+
+  private static final long serialVersionUID = -6455664684151074915L;
+
+  public MissingStubException(String msg) {
+    super(msg);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/ShunnedMemberException.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/ShunnedMemberException.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/ShunnedMemberException.java
deleted file mode 100644
index 59db762..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/ShunnedMemberException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.distributed.internal.direct;
-
-import com.gemstone.gemfire.GemFireCheckedException;
-
-/**
- * Exception thrown when a member is no longer in the distributed system
- * 
- */
-public class ShunnedMemberException extends GemFireCheckedException
-{
-
-  private static final long serialVersionUID = -6455664684151074915L;
-
-  public ShunnedMemberException(String msg) {
-    super(msg);
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java
index 7416efa..a46680b 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java
@@ -27,6 +27,7 @@ import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.internal.DMStats;
 import com.gemstone.gemfire.distributed.internal.DistributionMessage;
 import com.gemstone.gemfire.internal.logging.InternalLogWriter;
+import com.gemstone.gemfire.internal.tcp.Stub;
 
 /**
  * A MembershipManager is responsible for reporting a MemberView, as well as
@@ -73,7 +74,7 @@ public interface MembershipManager {
    * @param m the member
    * @return true if it still exists
    */
-  public boolean memberExists(DistributedMember m);
+  public boolean memberExists(InternalDistributedMember m);
   
   /**
    * Is this manager still connected?  If it has not been initialized, this
@@ -142,6 +143,25 @@ public interface MembershipManager {
   throws NotSerializableException;
   
   /**
+   * Return a {@link Stub} referring to the given member.  A <em>null</em> may
+   * be returned if the system is not employing stubs for communication.
+   * 
+   * @param m the member
+   * @return the stub
+   */
+  public Stub getStubForMember(InternalDistributedMember m);
+  
+  /**
+   * Return a {@link InternalDistributedMember} associated with the given Stub.  This
+   * method may return a null if Stubs are not being used.
+   * @param s Stub to look up
+   * @param validated true if member must be in the current view
+   * @return the member associated with the given stub, if any
+   */
+  public InternalDistributedMember getMemberForStub(Stub s, boolean validated);
+  
+  
+  /**
    * Indicates to the membership manager that the system is shutting down.
    * Typically speaking, this means that new connection attempts are to be
    * ignored and disconnect failures are to be (more) tolerated.
@@ -266,7 +286,7 @@ public interface MembershipManager {
    */
   public void warnShun(DistributedMember mbr);
   
-  public boolean addSurpriseMember(DistributedMember mbr);
+  public boolean addSurpriseMember(DistributedMember mbr, Stub stub);
   
   /** if a StartupMessage is going to reject a new member, this should be used
    * to make sure we don't keep that member on as a "surprise member"
@@ -287,11 +307,6 @@ public interface MembershipManager {
    * @return true if the member is a surprise member
    */
   public boolean isSurpriseMember(DistributedMember m);
-  
-  /**
-   * Returns true if the member is being shunned
-   */
-  public boolean isShunned(DistributedMember m);
 
   /**
    * Forces use of UDP for communications in the current thread.  UDP is

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
index 7be0a3a..0b7a544 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
@@ -94,6 +94,7 @@ import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
 import com.gemstone.gemfire.internal.shared.StringPrintWriter;
 import com.gemstone.gemfire.internal.tcp.ConnectExceptions;
 import com.gemstone.gemfire.internal.tcp.MemberShunnedException;
+import com.gemstone.gemfire.internal.tcp.Stub;
 import com.gemstone.gemfire.internal.util.Breadcrumbs;
 
 public class GMSMembershipManager implements MembershipManager, Manager
@@ -155,6 +156,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
     boolean crashed;
     String reason;
     DistributionMessage dmsg;
+    Stub stub;
     NetView gmsView;
     
     @Override
@@ -163,7 +165,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
       sb.append("kind=");
       switch (kind) {
       case SURPRISE_CONNECT:
-        sb.append("connect; member = <" + member + ">");
+        sb.append("connect; member = <" + member + ">; stub = " + stub);
         break;
       case VIEW:
         String text = gmsView.toString();
@@ -182,10 +184,12 @@ public class GMSMembershipManager implements MembershipManager, Manager
     /**
      * Create a surprise connect event
      * @param member the member connecting
+     * @param id the stub
      */
-    StartupEvent(final InternalDistributedMember member) {
+    StartupEvent(final InternalDistributedMember member, final Stub id) {
       this.kind = SURPRISE_CONNECT;
       this.member = member;
+      this.stub = id;
     }
     /**
      * Indicate if this is a surprise connect event
@@ -278,6 +282,24 @@ public class GMSMembershipManager implements MembershipManager, Manager
   volatile boolean hasJoined;
   
   /**
+   * a map keyed on InternalDistributedMember, values are Stubs that represent direct
+   * channels to other systems
+   * 
+   * Accesses must be under the read or write lock of {@link #latestViewLock}.
+   */
+  protected final Map<InternalDistributedMember, Stub> memberToStubMap = 
+      new ConcurrentHashMap<InternalDistributedMember, Stub>();
+
+  /**
+   * a map of direct channels (Stub) to InternalDistributedMember. key instanceof Stub
+   * value instanceof InternalDistributedMember
+   * 
+   * Accesses must be under the read or write lock of {@link #latestViewLock}.
+   */
+  protected final Map<Stub, InternalDistributedMember> stubToMemberMap = 
+      new ConcurrentHashMap<Stub, InternalDistributedMember>();
+  
+  /**
    * Members of the distributed system that we believe have shut down.
    * Keys are instances of {@link InternalDistributedMember}, values are 
    * Longs indicating the time this member was shunned.
@@ -525,6 +547,12 @@ public class GMSMembershipManager implements MembershipManager, Manager
           }
         }
 
+        // fix for bug #42006, lingering old identity
+        Object oldStub = this.memberToStubMap.remove(m);
+        if (oldStub != null) {
+          this.stubToMemberMap.remove(oldStub);
+        }
+
         if (shutdownInProgress()) {
           addShunnedMember(m);
           continue; // no additions processed after shutdown begins
@@ -778,6 +806,9 @@ public class GMSMembershipManager implements MembershipManager, Manager
     
     if (directChannel != null) {
       directChannel.setLocalAddr(address);
+      Stub stub = directChannel.getLocalStub();
+      memberToStubMap.put(address, stub);
+      stubToMemberMap.put(stub, address);
     }
 
     this.hasJoined = true;
@@ -874,15 +905,17 @@ public class GMSMembershipManager implements MembershipManager, Manager
   /**
    * Process a surprise connect event, or place it on the startup queue.
    * @param member the member
+   * @param stub its stub
    */
   protected void handleOrDeferSurpriseConnect(InternalDistributedMember member) {
+    Stub stub = new Stub(member.getInetAddress(), member.getDirectChannelPort(), member.getVmViewId());
     synchronized (startupLock) {
       if (!processingEvents) {
-        startupMessages.add(new StartupEvent(member));
+        startupMessages.add(new StartupEvent(member, stub));
         return;
       }
     }
-    processSurpriseConnect(member);
+    processSurpriseConnect(member, stub);
   }
   
   public void startupMessageFailed(DistributedMember mbr, String failureMessage) {
@@ -908,9 +941,12 @@ public class GMSMembershipManager implements MembershipManager, Manager
    * been added, simply returns; else adds the member.
    * 
    * @param dm the member joining
+   * @param stub the member's stub
    */
-  public boolean addSurpriseMember(DistributedMember dm) {
+  public boolean addSurpriseMember(DistributedMember dm, 
+      Stub stub) {
     final InternalDistributedMember member = (InternalDistributedMember)dm;
+    Stub s = null;
     boolean warn = false;
     
     latestViewLock.writeLock().lock();
@@ -973,6 +1009,16 @@ public class GMSMembershipManager implements MembershipManager, Manager
           startCleanupTimer();
         } // cleanupTimer == null
 
+        // fix for bug #42006, lingering old identity
+        Object oldStub = this.memberToStubMap.remove(member);
+        if (oldStub != null) {
+          this.stubToMemberMap.remove(oldStub);
+        }
+
+        s = stub == null ? getStubForMember(member) : stub;
+        // Make sure that channel information is consistent
+        addChannel(member, s);
+
         // Ensure that the member is accounted for in the view
         // Conjure up a new view including the new member. This is necessary
         // because we are about to tell the listener about a new member, so
@@ -1108,7 +1154,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
 
         // If it's a new sender, wait our turn, generate the event
         if (isNew) {
-          shunned = !addSurpriseMember(m);
+          shunned = !addSurpriseMember(m, getStubForMember(m));
         } // isNew
       }
 
@@ -1120,7 +1166,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
     if (shunned) { // bug #41538 - shun notification must be outside synchronization to avoid hanging
       warnShun(m);
       logger.info("Membership: Ignoring message from shunned member <{}>:{}", m, msg);
-      throw new MemberShunnedException(m);
+      throw new MemberShunnedException(getStubForMember(m));
     }
     
     listener.messageReceived(msg);
@@ -1202,11 +1248,13 @@ public class GMSMembershipManager implements MembershipManager, Manager
    * grabbed a stable view if this is really a new member.
    * 
    * @param member
+   * @param stub
    */
   private void processSurpriseConnect(
-      InternalDistributedMember member) 
+      InternalDistributedMember member, 
+      Stub stub) 
   {
-    addSurpriseMember(member);
+    addSurpriseMember(member, stub);
   }
   
   /**
@@ -1228,7 +1276,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
       processView(o.gmsView.getViewId(), o.gmsView);
     }
     else if (o.isSurpriseConnect()) { // connect
-      processSurpriseConnect(o.member);
+      processSurpriseConnect(o.member, o.stub);
     }
     
     else // sanity
@@ -1402,7 +1450,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
     }
   }
 
-  public boolean memberExists(DistributedMember m) {
+  public boolean memberExists(InternalDistributedMember m) {
     latestViewLock.readLock().lock();
     NetView v = latestView;
     latestViewLock.readLock().unlock();
@@ -1477,6 +1525,12 @@ public class GMSMembershipManager implements MembershipManager, Manager
       directChannel.emergencyClose();
     }
     
+    // could we guarantee not to allocate objects?  We're using Darrel's 
+    // factory, so it's possible that an unsafe implementation could be
+    // introduced here.
+//    stubToMemberMap.clear();
+//    memberToStubMap.clear();
+    
     if (DEBUG) {
       System.err.println("DEBUG: done closing GroupMembershipService");
     }
@@ -1713,7 +1767,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
       allDestinations = true;
       latestViewLock.writeLock().lock();
       try {
-        List<InternalDistributedMember> keySet = latestView.getMembers();
+        Set keySet = memberToStubMap.keySet();
         keys = new InternalDistributedMember[keySet.size()];
         keys = (InternalDistributedMember[])keySet.toArray(keys);
       } finally {
@@ -1966,6 +2020,80 @@ public class GMSMembershipManager implements MembershipManager, Manager
     // not currently supported by this manager
   }
   
+  /**
+   * Get or create stub for given member
+   */
+  public Stub getStubForMember(InternalDistributedMember m)
+  {
+    if (shutdownInProgress) {
+      throw new DistributedSystemDisconnectedException(LocalizedStrings.GroupMembershipService_DISTRIBUTEDSYSTEM_IS_SHUTTING_DOWN.toLocalizedString(), services.getShutdownCause());
+    }
+
+    if (services.getConfig().getDistributionConfig().getDisableTcp()) {
+      return new Stub(m.getInetAddress(), m.getPort(), m.getVmViewId());
+    }
+    
+    // Return existing one if it is already in place
+    Stub result;
+    result = (Stub)memberToStubMap.get(m);
+    if (result != null)
+      return result;
+
+    latestViewLock.writeLock().lock();
+    try {
+      // Do all of this work in a critical region to prevent
+      // members from slipping in during shutdown
+      if (shutdownInProgress())
+        return null; // don't try to create a stub during shutdown
+      if (isShunned(m))
+        return null; // don't let zombies come back to life
+      
+      // OK, create one.  Update the table to reflect the creation.
+      result = directChannel.createConduitStub(m);
+      addChannel(m, result);
+    } finally {
+      latestViewLock.writeLock().unlock();
+    }
+   return result;
+  }
+
+  public InternalDistributedMember getMemberForStub(Stub s, boolean validated)
+  {
+    latestViewLock.writeLock().lock();
+    try {
+      if (shutdownInProgress) {
+        throw new DistributedSystemDisconnectedException(LocalizedStrings.GroupMembershipService_DISTRIBUTEDSYSTEM_IS_SHUTTING_DOWN.toLocalizedString(), services.getShutdownCause());
+      }
+      InternalDistributedMember result = (InternalDistributedMember)
+          stubToMemberMap.get(s);
+      if (result != null) {
+        if (validated && !this.latestView.contains(result)) {
+          // Do not return this member unless it is in the current view.
+          if (!surpriseMembers.containsKey(result)) {
+            // if not a surprise member, this stub is lingering and should be removed
+            stubToMemberMap.remove(s);
+            memberToStubMap.remove(result);
+          }
+          result = null;
+          // fall through to see if there is a newer member using the same direct port
+        }
+      }
+      if (result == null) {
+        // it may have not been added to the stub->idm map yet, so check the current view
+        for (InternalDistributedMember idm: latestView.getMembers()) {
+          if (GMSUtil.compareAddresses(idm.getInetAddress(), s.getInetAddress()) == 0
+              && idm.getDirectChannelPort() == s.getPort()) {
+            addChannel(idm, s);
+            return idm;
+          }
+        }
+      }
+      return result;
+    } finally {
+      latestViewLock.writeLock().unlock();
+    }
+  }
+
   public void setShutdown()
   {
     latestViewLock.writeLock().lock();
@@ -1981,6 +2109,24 @@ public class GMSMembershipManager implements MembershipManager, Manager
     return shutdownInProgress || (dm != null && dm.shutdownInProgress());
   }
   
+  /**
+   * Add a mapping from the given member to the given stub. Must
+   * be called with {@link #latestViewLock} held.
+   * 
+   * @param member
+   * @param theChannel
+   */
+  protected void addChannel(InternalDistributedMember member, Stub theChannel)
+  {
+    if (theChannel != null) {
+      // Don't overwrite existing stub information with a null
+      this.memberToStubMap.put(member, theChannel);
+
+      // Can't create reverse mapping if the stub is null
+      this.stubToMemberMap.put(theChannel, member);
+    }
+  }
+
 
   /**
    * Clean up and create consistent new view with member removed.
@@ -1991,6 +2137,12 @@ public class GMSMembershipManager implements MembershipManager, Manager
   protected void destroyMember(final InternalDistributedMember member,
       boolean crashed, final String reason) {
     
+    // Clean up the maps
+    Stub theChannel = (Stub)memberToStubMap.remove(member);
+    if (theChannel != null) {
+      this.stubToMemberMap.remove(theChannel);
+    }
+    
     // Make sure it is removed from the view
     latestViewLock.writeLock().lock();
     try {
@@ -2213,11 +2365,12 @@ public class GMSMembershipManager implements MembershipManager, Manager
   /* non-thread-owned serial channels and high priority channels are not
    * included
    */
-  public Map getChannelStates(DistributedMember member, boolean includeMulticast) {
+  public Map getMessageState(DistributedMember member, boolean includeMulticast) {
     Map result = new HashMap();
+    Stub stub = (Stub)memberToStubMap.get(member);
     DirectChannel dc = directChannel;
-    if (dc != null) {
-      dc.getChannelStates(member, result);
+    if (stub != null && dc != null) {
+      dc.getChannelStates(stub, result);
     }
     services.getMessenger().getMessageState((InternalDistributedMember)member, result, includeMulticast);
     return result;
@@ -2228,8 +2381,15 @@ public class GMSMembershipManager implements MembershipManager, Manager
   {
     if (Thread.interrupted()) throw new InterruptedException();
     DirectChannel dc = directChannel;
-    if (dc != null) {
-      dc.waitForChannelState(otherMember, channelState);
+    Stub stub;
+    latestViewLock.writeLock().lock();
+    try {
+      stub = (Stub)memberToStubMap.get(otherMember);
+    } finally {
+      latestViewLock.writeLock().unlock();
+    }
+    if (dc != null && stub != null) {
+      dc.waitForChannelState(stub, state);
     }
     services.getMessenger().waitForMessageState((InternalDistributedMember)otherMember, state);
   }
@@ -2245,6 +2405,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
     boolean result = false;
     DirectChannel dc = directChannel;
     InternalDistributedMember idm = (InternalDistributedMember)mbr;
+    Stub stub = new Stub(idm.getInetAddress(), idm.getPort(), idm.getVmViewId());
     int memberTimeout = this.services.getConfig().getDistributionConfig().getMemberTimeout();
     long pauseTime = (memberTimeout < 1000) ? 100 : memberTimeout / 10;
     boolean wait;
@@ -2252,7 +2413,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
     do {
       wait = false;
       if (dc != null) {
-        if (dc.hasReceiversFor(idm)) {
+        if (dc.hasReceiversFor(stub)) {
           wait = true;
         }
         if (wait && logger.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java
index 780fe18..7bb97b9 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java
@@ -1109,7 +1109,7 @@ class ParentLocalizedStrings {
   public static final StringId TCPConduit_ENDING_RECONNECT_ATTEMPT_BECAUSE_0_HAS_DISAPPEARED = new StringId(2086, "Ending reconnect attempt because {0} has disappeared.");
   public static final StringId TCPConduit_ENDING_RECONNECT_ATTEMPT_TO_0_BECAUSE_SHUTDOWN_HAS_STARTED = new StringId(2087, "Ending reconnect attempt to {0} because shutdown has started.");
   public static final StringId TCPConduit_ERROR_SENDING_MESSAGE_TO_0_WILL_REATTEMPT_1 = new StringId(2088, "Error sending message to {0} (will reattempt): {1}");
-  public static final StringId TCPConduit_EXCEPTION_CREATING_SERVERSOCKET = new StringId(2089, "While creating ServerSocket on port {0} with address {1}");
+  public static final StringId TCPConduit_EXCEPTION_CREATING_SERVERSOCKET = new StringId(2089, "While creating ServerSocket and Stub on port {0} with address {1}");
   public static final StringId TCPConduit_EXCEPTION_PARSING_P2PIDLECONNECTIONTIMEOUT = new StringId(2090, "exception parsing p2p.idleConnectionTimeout");
   public static final StringId TCPConduit_EXCEPTION_PARSING_P2PTCPBUFFERSIZE = new StringId(2091, "exception parsing p2p.tcpBufferSize");
   public static final StringId TCPConduit_FAILED_TO_ACCEPT_CONNECTION_FROM_0_BECAUSE_1 = new StringId(2092, "Failed to accept connection from {0} because {1}");
@@ -1444,7 +1444,7 @@ class ParentLocalizedStrings {
   public static final StringId Connection_DETECTED_WRONG_VERSION_OF_GEMFIRE_PRODUCT_DURING_HANDSHAKE_EXPECTED_0_BUT_FOUND_1 = new StringId(2432, "Detected wrong version of GemFire product during handshake. Expected  {0}  but found  {1}");
   public static final StringId Connection_FORCED_DISCONNECT_SENT_TO_0 = new StringId(2433, "Forced disconnect sent to  {0}");
   public static final StringId Connection_HANDSHAKE_FAILED = new StringId(2434, "Handshake failed");
-  public static final StringId Connection_MEMBER_LEFT_THE_GROUP = new StringId(2435, "Member {0}  left the group");
+  public static final StringId Connection_MEMBER_FOR_STUB_0_LEFT_THE_GROUP = new StringId(2435, "Member for stub  {0}  left the group");
   public static final StringId Connection_NOT_CONNECTED_TO_0 = new StringId(2436, "Not connected to  {0}");
   public static final StringId Connection_NULL_CONNECTIONTABLE = new StringId(2437, "Null ConnectionTable");
   public static final StringId Connection_SOCKET_HAS_BEEN_CLOSED = new StringId(2438, "socket has been closed");
@@ -1542,7 +1542,7 @@ class ParentLocalizedStrings {
   public static final StringId DefaultQuery_WHEN_QUERYING_A_PARTITIONED_REGION_THE_PROJECTIONS_MUST_NOT_REFERENCE_ANY_REGIONS = new StringId(2530, "When querying a Partitioned Region, the projections must not reference any regions");
   public static final StringId DestroyMessage_FAILED_SENDING_0 = new StringId(2531, "Failed sending < {0} >");
   public static final StringId DirectChannel_COMMUNICATIONS_DISCONNECTED = new StringId(2532, "communications disconnected");
-  public static final StringId DirectChannel_SHUNNING_0 = new StringId(2533, "Member is being shunned: {0}");
+  public static final StringId DirectChannel_NO_STUB_0 = new StringId(2533, "No stub {0}");
   public static final StringId DirectChannel_UNKNOWN_ERROR_SERIALIZING_MESSAGE = new StringId(2534, "Unknown error serializing message");
   public static final StringId DiskEntry_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING = new StringId(2535, "An IOException was thrown while serializing.");
   public static final StringId DiskEntry_DISK_REGION_IS_NULL = new StringId(2536, "Disk region is null");

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
index 74660da..f918812 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
@@ -50,7 +50,6 @@ import org.apache.logging.log4j.Logger;
 import com.gemstone.gemfire.CancelException;
 import com.gemstone.gemfire.SystemFailure;
 import com.gemstone.gemfire.cache.CacheClosedException;
-import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
 import com.gemstone.gemfire.distributed.internal.ConflationKey;
 import com.gemstone.gemfire.distributed.internal.DM;
@@ -73,6 +72,7 @@ import com.gemstone.gemfire.internal.Assert;
 import com.gemstone.gemfire.internal.ByteArrayDataInput;
 import com.gemstone.gemfire.internal.DSFIDFactory;
 import com.gemstone.gemfire.internal.InternalDataSerializer;
+import com.gemstone.gemfire.internal.SocketCloser;
 import com.gemstone.gemfire.internal.SocketCreator;
 import com.gemstone.gemfire.internal.SocketUtils;
 import com.gemstone.gemfire.internal.SystemTimer;
@@ -222,6 +222,11 @@ public class Connection implements Runnable {
   /** the ID string of the conduit (for logging) */
   String conduitIdStr;
 
+  /** remoteId identifies the remote conduit's listener.  It does NOT
+     identify the "port" that this connection's socket is attached
+     to, which is a different thing altogether */
+  Stub remoteId;
+
   /** Identifies the java group member on the other side of the connection. */
   InternalDistributedMember remoteAddr;
 
@@ -796,7 +801,7 @@ public class Connection implements Runnable {
           }
           if (success) {
             if (this.isReceiver) {
-              needToClose = !owner.getConduit().getMembershipManager().addSurpriseMember(this.remoteAddr);
+              needToClose = !owner.getConduit().getMembershipManager().addSurpriseMember(this.remoteAddr, this.remoteId);
               if (needToClose) {
                 reason = "this member is shunned";
               }
@@ -840,7 +845,7 @@ public class Connection implements Runnable {
    * @param beingSick
    */
   private void asyncClose(boolean beingSick) {
-    // note: remoteAddr may be null if this is a receiver that hasn't finished its handshake
+    // note: remoteId may be null if this is a receiver that hasn't finished its handshake
     
     // we do the close in a background thread because the operation may hang if 
     // there is a problem with the network.  See bug #46659
@@ -1013,7 +1018,8 @@ public class Connection implements Runnable {
   protected static Connection createSender(final MembershipManager mgr,
                                            final ConnectionTable t,
                                            final boolean preserveOrder,
-                                           final DistributedMember remoteAddr,
+                                           final Stub key,
+                                           final InternalDistributedMember remoteAddr,
                                            final boolean sharedResource,
                                            final long startTime,
                                            final long ackTimeout,
@@ -1068,8 +1074,9 @@ public class Connection implements Runnable {
         }
         if (firstTime) {
           firstTime = false;
-          if (!mgr.memberExists(remoteAddr) || mgr.isShunned(remoteAddr) || mgr.shutdownInProgress()) {
-            throw new IOException("Member " + remoteAddr + " left the system");
+          InternalDistributedMember m = mgr.getMemberForStub(key, true);
+          if (m == null) {
+            throw new IOException("Member for stub " + key + " left the group");
           }
         }
         else {
@@ -1077,7 +1084,7 @@ public class Connection implements Runnable {
           // alert listener should not prevent cache operations from continuing
           if (AlertAppender.isThreadAlerting()) {
             // do not change the text of this exception - it is looked for in exception handlers
-            throw new IOException("Cannot form connection to alert listener " + remoteAddr);
+            throw new IOException("Cannot form connection to alert listener " + key);
           }
             
           // Wait briefly...
@@ -1090,19 +1097,20 @@ public class Connection implements Runnable {
             t.getConduit().getCancelCriterion().checkCancelInProgress(ie);
           }
           t.getConduit().getCancelCriterion().checkCancelInProgress(null);
-          if (giveUpOnMember(mgr, remoteAddr)) {
-            throw new IOException(LocalizedStrings.Connection_MEMBER_LEFT_THE_GROUP.toLocalizedString(remoteAddr));
+          InternalDistributedMember m = mgr.getMemberForStub(key, true);
+          if (m == null) {
+            throw new IOException(LocalizedStrings.Connection_MEMBER_FOR_STUB_0_LEFT_THE_GROUP.toLocalizedString(key));
           }
           if (!warningPrinted) {
             warningPrinted = true;
-            logger.warn(LocalizedMessage.create(LocalizedStrings.Connection_CONNECTION_ATTEMPTING_RECONNECT_TO_PEER__0, remoteAddr));
+            logger.warn(LocalizedMessage.create(LocalizedStrings.Connection_CONNECTION_ATTEMPTING_RECONNECT_TO_PEER__0, m));
           }          
           t.getConduit().stats.incReconnectAttempts();
         }
         //create connection
         try {
           conn = null;
-          conn = new Connection(mgr, t, preserveOrder, remoteAddr, sharedResource);
+          conn = new Connection(mgr, t, preserveOrder, key, remoteAddr, sharedResource);
         }
         catch (javax.net.ssl.SSLHandshakeException se) {
           // no need to retry if certificates were rejected
@@ -1110,7 +1118,8 @@ public class Connection implements Runnable {
         }
         catch (IOException ioe) {
           // Only give up if the member leaves the view.
-          if (giveUpOnMember(mgr, remoteAddr)) {
+          InternalDistributedMember m = mgr.getMemberForStub(key, true);
+          if (m == null) {
             throw ioe;
           }
           t.getConduit().getCancelCriterion().checkCancelInProgress(null);
@@ -1121,7 +1130,7 @@ public class Connection implements Runnable {
             connectionErrorLogged = true; // otherwise change to use 100ms intervals causes a lot of these
             logger.info(LocalizedMessage.create(
                 LocalizedStrings.Connection_CONNECTION_FAILED_TO_CONNECT_TO_PEER_0_BECAUSE_1,
-                new Object[] {sharedResource, preserveOrder, remoteAddr, ioe}));
+                new Object[] {sharedResource, preserveOrder, m, ioe}));
           }
         } // IOException
         finally {
@@ -1137,8 +1146,9 @@ public class Connection implements Runnable {
               // something went wrong while reading the handshake
               // and the socket was closed or this guy sent us a
               // ShutdownMessage
-              if (giveUpOnMember(mgr, remoteAddr)) {
-                throw new IOException(LocalizedStrings.Connection_MEMBER_LEFT_THE_GROUP.toLocalizedString(remoteAddr));
+              InternalDistributedMember m = mgr.getMemberForStub(key, true);
+              if (m == null) {
+                throw new IOException(LocalizedStrings.Connection_MEMBER_FOR_STUB_0_LEFT_THE_GROUP.toLocalizedString(key));
               }
               t.getConduit().getCancelCriterion().checkCancelInProgress(null);
               // no success but no need to log; just retry
@@ -1151,7 +1161,8 @@ public class Connection implements Runnable {
             throw e;
           }
           catch (ConnectionException e) {
-            if (giveUpOnMember(mgr, remoteAddr)) {
+            InternalDistributedMember m = mgr.getMemberForStub(key, true);
+            if (m == null) {
               IOException ioe = new IOException(LocalizedStrings.Connection_HANDSHAKE_FAILED.toLocalizedString());
               ioe.initCause(e);
               throw ioe;
@@ -1159,16 +1170,17 @@ public class Connection implements Runnable {
             t.getConduit().getCancelCriterion().checkCancelInProgress(null);
             logger.info(LocalizedMessage.create(
                 LocalizedStrings.Connection_CONNECTION_HANDSHAKE_FAILED_TO_CONNECT_TO_PEER_0_BECAUSE_1,
-                new Object[] {sharedResource, preserveOrder, remoteAddr ,e}));
+                new Object[] {sharedResource, preserveOrder, m,e}));
           }
           catch (IOException e) {
-            if (giveUpOnMember(mgr, remoteAddr)) {
+            InternalDistributedMember m = mgr.getMemberForStub(key, true);
+            if (m == null) {
               throw e;
             }
             t.getConduit().getCancelCriterion().checkCancelInProgress(null);
             logger.info(LocalizedMessage.create(
                 LocalizedStrings.Connection_CONNECTION_HANDSHAKE_FAILED_TO_CONNECT_TO_PEER_0_BECAUSE_1,
-                new Object[] {sharedResource, preserveOrder, remoteAddr ,e}));
+                new Object[] {sharedResource, preserveOrder, m,e}));
             if (!sharedResource && "Too many open files".equals(e.getMessage())) {
               t.fileDescriptorsExhausted();
             }
@@ -1208,7 +1220,7 @@ public class Connection implements Runnable {
     if (conn == null) {
       throw new ConnectionException(
         LocalizedStrings.Connection_CONNECTION_FAILED_CONSTRUCTION_FOR_PEER_0
-          .toLocalizedString(remoteAddr));
+          .toLocalizedString(mgr.getMemberForStub(key, true)));
     }
     if (preserveOrder && BATCH_SENDS) {
       conn.createBatchSendBuffer();
@@ -1216,15 +1228,12 @@ public class Connection implements Runnable {
     conn.finishedConnecting = true;
     return conn;
   }
-  
-  private static boolean giveUpOnMember(MembershipManager mgr, DistributedMember remoteAddr) {
-    return !mgr.memberExists(remoteAddr) || mgr.isShunned(remoteAddr) || mgr.shutdownInProgress();
-  }
 
-  private void setRemoteAddr(DistributedMember m) {
+  private void setRemoteAddr(InternalDistributedMember m, Stub stub) {
     this.remoteAddr = this.owner.getDM().getCanonicalId(m);
+    this.remoteId = stub;
     MembershipManager mgr = this.owner.owner.getMembershipManager();
-    mgr.addSurpriseMember(m);
+    mgr.addSurpriseMember(m, stub);
   }
   
   /** creates a new connection to a remote server.
@@ -1234,11 +1243,11 @@ public class Connection implements Runnable {
   private Connection(MembershipManager mgr,
                      ConnectionTable t,
                      boolean preserveOrder,
-                     DistributedMember remoteID,
+                     Stub key,
+                     InternalDistributedMember remoteAddr,
                      boolean sharedResource)
     throws IOException, DistributedSystemDisconnectedException
   {    
-    InternalDistributedMember remoteAddr = (InternalDistributedMember)remoteID;
     if (t == null) {
       throw new IllegalArgumentException(LocalizedStrings.Connection_CONNECTIONTABLE_IS_NULL.toLocalizedString());
     }
@@ -1246,7 +1255,7 @@ public class Connection implements Runnable {
     this.owner = t;
     this.sharedResource = sharedResource;
     this.preserveOrder = preserveOrder;
-    setRemoteAddr(remoteAddr);
+    setRemoteAddr(remoteAddr, key);
     this.conduitIdStr = this.owner.getConduit().getId().toString();
     this.handshakeRead = false;
     this.handshakeCancelled = false;
@@ -1256,7 +1265,7 @@ public class Connection implements Runnable {
 
     // connect to listening socket
 
-    InetSocketAddress addr = new InetSocketAddress(remoteAddr.getInetAddress(), remoteAddr.getDirectChannelPort());
+    InetSocketAddress addr = new InetSocketAddress(remoteId.getInetAddress(), remoteId.getPort());
     if (useNIO()) {
       SocketChannel channel = SocketChannel.open();
       this.owner.addConnectingSocket(channel.socket(), addr.getAddress());
@@ -1316,15 +1325,15 @@ public class Connection implements Runnable {
     else {
       if (TCPConduit.useSSL) {
         // socket = javax.net.ssl.SSLSocketFactory.getDefault()
-        //  .createSocket(remoteAddr.getInetAddress(), remoteAddr.getPort());
+        //  .createSocket(remoteId.getInetAddress(), remoteId.getPort());
         int socketBufferSize = sharedResource ? SMALL_BUFFER_SIZE : this.owner.getConduit().tcpBufferSize;
-        this.socket = SocketCreator.getDefaultInstance().connectForServer( remoteAddr.getInetAddress(), remoteAddr.getDirectChannelPort(), socketBufferSize );
+        this.socket = SocketCreator.getDefaultInstance().connectForServer( remoteId.getInetAddress(), remoteId.getPort(), socketBufferSize );
         // Set the receive buffer size local fields. It has already been set in the socket.
         setSocketBufferSize(this.socket, false, socketBufferSize, true);
         setSendBufferSize(this.socket);
       }
       else {
-        //socket = new Socket(remoteAddr.getInetAddress(), remoteAddr.getPort());
+        //socket = new Socket(remoteId.getInetAddress(), remoteId.getPort());
         Socket s = new Socket();
         this.socket = s;
         s.setTcpNoDelay(true);
@@ -1630,8 +1639,8 @@ public class Connection implements Runnable {
     // we can't wait for the reader thread when running in an IBM JRE.  See
     // bug 41889
     if (this.owner.owner.config.getEnableNetworkPartitionDetection() ||
-        this.owner.owner.getLocalAddr().getVmKind() == DistributionManager.ADMIN_ONLY_DM_TYPE ||
-        this.owner.owner.getLocalAddr().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) {
+        this.owner.owner.getLocalId().getVmKind() == DistributionManager.ADMIN_ONLY_DM_TYPE ||
+        this.owner.owner.getLocalId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) {
       isIBM = "IBM Corporation".equals(System.getProperty("java.vm.vendor"));
     }
     {
@@ -1680,16 +1689,16 @@ public class Connection implements Runnable {
               // Only remove endpoint if sender.
               if (this.finishedConnecting) {
                 // only remove endpoint if our constructor finished
-                this.owner.removeEndpoint(this.remoteAddr, reason);
+                this.owner.removeEndpoint(this.remoteId, reason);
               }
             }
           }
           else {
-            this.owner.removeSharedConnection(reason, this.remoteAddr, this.preserveOrder, this);
+            this.owner.removeSharedConnection(reason, this.remoteId, this.preserveOrder, this);
           }
         }
         else if (!this.isReceiver) {
-          this.owner.removeThreadConnection(this.remoteAddr, this);
+          this.owner.removeThreadConnection(this.remoteId, this);
         }
       }
       else {
@@ -1697,10 +1706,10 @@ public class Connection implements Runnable {
         // has never added this Connection to its maps since
         // the calls in this block use our identity to do the removes.
         if (this.sharedResource) {
-          this.owner.removeSharedConnection(reason, this.remoteAddr, this.preserveOrder, this);
+          this.owner.removeSharedConnection(reason, this.remoteId, this.preserveOrder, this);
         }
         else if (!this.isReceiver) {
-          this.owner.removeThreadConnection(this.remoteAddr, this);
+          this.owner.removeThreadConnection(this.remoteId, this);
         }
       }
     }
@@ -1744,7 +1753,7 @@ public class Connection implements Runnable {
     } finally {
       // bug36060: do the socket close within a finally block
       if (logger.isDebugEnabled()) {
-        logger.debug("Stopping {} for {}", p2pReaderName(), remoteAddr);
+        logger.debug("Stopping {} for {}", p2pReaderName(), remoteId);
       }
       initiateSuspicionIfSharedUnordered();
       if (this.isReceiver) {
@@ -2329,7 +2338,8 @@ public class Connection implements Runnable {
                     .toLocalizedString(new Object[]{new Byte(HANDSHAKE_VERSION), new Byte(handShakeByte)}));
               }
               InternalDistributedMember remote = DSFIDFactory.readInternalDistributedMember(dis);
-              setRemoteAddr(remote);
+              Stub stub = new Stub(remote.getInetAddress()/*fix for bug 33615*/, remote.getDirectChannelPort(), remote.getVmViewId());
+              setRemoteAddr(remote, stub);
               Thread.currentThread().setName(LocalizedStrings.Connection_P2P_MESSAGE_READER_FOR_0.toLocalizedString(this.remoteAddr, this.socket.getPort()));
               this.sharedResource = dis.readBoolean();
               this.preserveOrder = dis.readBoolean();
@@ -2367,7 +2377,7 @@ public class Connection implements Runnable {
               }
               
               if (logger.isDebugEnabled()) {
-                logger.debug("{} remoteAddr is {} {}", p2pReaderName(), this.remoteAddr,
+                logger.debug("{} remoteId is {} {}", p2pReaderName(), this.remoteId,
                     (this.remoteVersion != null ? " (" + this.remoteVersion + ')' : ""));
               }
 
@@ -2545,7 +2555,7 @@ public class Connection implements Runnable {
     throws IOException, ConnectionException
   {
     if (!connected) {
-      throw new ConnectionException(LocalizedStrings.Connection_NOT_CONNECTED_TO_0.toLocalizedString(this.remoteAddr));
+      throw new ConnectionException(LocalizedStrings.Connection_NOT_CONNECTED_TO_0.toLocalizedString(this.remoteId));
     }
     if (this.batchFlusher != null) {
       batchSend(buffer);
@@ -2768,7 +2778,7 @@ public class Connection implements Runnable {
         if (this.disconnectRequested) {
           buffer.position(origBufferPos);
           // we have given up so just drop this message.
-          throw new ConnectionException(LocalizedStrings.Connection_FORCED_DISCONNECT_SENT_TO_0.toLocalizedString(this.remoteAddr));
+          throw new ConnectionException(LocalizedStrings.Connection_FORCED_DISCONNECT_SENT_TO_0.toLocalizedString(this.remoteId));
         }
         if (!force && !this.asyncQueuingInProgress) {
           // reset buffer since we will be sending it. This fixes bug 34832
@@ -2970,7 +2980,7 @@ public class Connection implements Runnable {
     }
     DM dm = this.owner.getDM();
     if (dm == null) {
-      this.owner.removeEndpoint(this.remoteAddr, LocalizedStrings.Connection_NO_DISTRIBUTION_MANAGER.toLocalizedString());
+      this.owner.removeEndpoint(this.remoteId, LocalizedStrings.Connection_NO_DISTRIBUTION_MANAGER.toLocalizedString());
       return;
     }
     dm.getMembershipManager().requestMemberRemoval(this.remoteAddr, 
@@ -2991,7 +3001,7 @@ public class Connection implements Runnable {
         return;
       }
     }
-    this.owner.removeEndpoint(this.remoteAddr, 
+    this.owner.removeEndpoint(this.remoteId, 
                               LocalizedStrings.Connection_FORCE_DISCONNECT_TIMED_OUT.toLocalizedString());
     if (dm.getOtherDistributionManagerIds().contains(this.remoteAddr)) {
       if (logger.isDebugEnabled()) {
@@ -3100,7 +3110,7 @@ public class Connection implements Runnable {
       stats.incAsyncThreads(-1);
       stats.incAsyncQueues(-1);
       if (logger.isDebugEnabled()) {
-        logger.debug("runNioPusher terminated id={} from {}/{}", conduitIdStr, remoteAddr, remoteAddr);
+        logger.debug("runNioPusher terminated id={} from {}/{}", conduitIdStr, remoteId, remoteAddr);
       }
     }
     } finally {
@@ -3827,7 +3837,8 @@ public class Connection implements Runnable {
                   throw new IllegalStateException(LocalizedStrings.Connection_DETECTED_WRONG_VERSION_OF_GEMFIRE_PRODUCT_DURING_HANDSHAKE_EXPECTED_0_BUT_FOUND_1.toLocalizedString(new Object[] {new Byte(HANDSHAKE_VERSION), new Byte(handShakeByte)}));
                 }
                 InternalDistributedMember remote = DSFIDFactory.readInternalDistributedMember(dis);
-                setRemoteAddr(remote);
+                Stub stub = new Stub(remote.getInetAddress()/*fix for bug 33615*/, remote.getDirectChannelPort(), remote.getVmViewId());
+                setRemoteAddr(remote, stub);
                 this.sharedResource = dis.readBoolean();
                 this.preserveOrder = dis.readBoolean();
                 this.uniqueId = dis.readLong();
@@ -3886,7 +3897,7 @@ public class Connection implements Runnable {
                 return;
               }
               if (logger.isDebugEnabled()) {
-                logger.debug("P2P handshake remoteAddr is {}{}", this.remoteAddr,
+                logger.debug("P2P handshake remoteId is {}{}", this.remoteId,
                     (this.remoteVersion != null ? " (" + this.remoteVersion + ')' : ""));
               }
               try {
@@ -4020,6 +4031,12 @@ public class Connection implements Runnable {
     this.accessed = true;
   }
 
+  /** returns the ConnectionKey stub representing the other side of
+      this connection (host:port) */
+  public final Stub getRemoteId() {
+    return remoteId;
+  }
+
   /** return the DM id of the guy on the other side of this connection.
    */
   public final InternalDistributedMember getRemoteAddress() {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
index 3816efe..bac356c 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
@@ -42,7 +42,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.logging.log4j.Logger;
 
 import com.gemstone.gemfire.SystemFailure;
-import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
 import com.gemstone.gemfire.distributed.internal.DM;
 import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
@@ -61,7 +60,7 @@ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 
 /** <p>ConnectionTable holds all of the Connection objects in a conduit.
     Connections represent a pipe between two endpoints represented
-    by generic DistributedMembers.</p>
+    by generic Stubs.</p>
 
     @author Bruce Schuchardt
     @author Darrel Schneider
@@ -346,7 +345,7 @@ public class ConnectionTable  {
   /**
    * Process a newly created PendingConnection
    * 
-   * @param id DistributedMember on which the connection is created
+   * @param id Stub on which the connection is created
    * @param sharedResource whether the connection is used by multiple threads
    * @param preserveOrder whether to preserve order
    * @param m map to add the connection to
@@ -358,7 +357,7 @@ public class ConnectionTable  {
    * @throws IOException if unable to connect
    * @throws DistributedSystemDisconnectedException
    */
-  private Connection handleNewPendingConnection(DistributedMember id, boolean sharedResource,
+  private Connection handleNewPendingConnection(Stub id, boolean sharedResource,
       boolean preserveOrder,
       Map m, PendingConnection pc, long startTime, long ackThreshold, long ackSAThreshold)
       throws IOException, DistributedSystemDisconnectedException
@@ -367,7 +366,7 @@ public class ConnectionTable  {
     Connection con = null;
     try {
       con = Connection.createSender(owner.getMembershipManager(), this, preserveOrder,
-                                    id,
+                                    id, this.owner.getMemberForStub(id, false),
                                     sharedResource,
                                     startTime, ackThreshold, ackSAThreshold);
       this.owner.stats.incSenders(sharedResource, preserveOrder);
@@ -443,7 +442,7 @@ public class ConnectionTable  {
    * unordered or conserve-sockets
    * note that unordered connections are currently always shared
    * 
-   * @param id the DistributedMember on which we are creating a connection
+   * @param id the Stub on which we are creating a connection
    * @param threadOwnsResources whether unordered conn is owned by the current thread
    * @param preserveOrder whether to preserve order
    * @param startTime the ms clock start time for the operation
@@ -453,7 +452,7 @@ public class ConnectionTable  {
    * @throws IOException if unable to create the connection
    * @throws DistributedSystemDisconnectedException
    */
-  private Connection getUnorderedOrConserveSockets(DistributedMember id, 
+  private Connection getUnorderedOrConserveSockets(Stub id, 
       boolean threadOwnsResources, boolean preserveOrder,
       long startTime, long ackTimeout, long ackSATimeout)
     throws IOException, DistributedSystemDisconnectedException
@@ -528,7 +527,7 @@ public class ConnectionTable  {
    * @throws IOException if the connection could not be created
    * @throws DistributedSystemDisconnectedException
    */
-  Connection getOrderedAndOwned(DistributedMember id, long startTime, long ackTimeout, long ackSATimeout) 
+  Connection getOrderedAndOwned(Stub id, long startTime, long ackTimeout, long ackSATimeout) 
       throws IOException, DistributedSystemDisconnectedException  {
     Connection result = null;
     
@@ -567,7 +566,7 @@ public class ConnectionTable  {
     // OK, we have to create a new connection.
     result = Connection.createSender(owner.getMembershipManager(), 
         this, true /* preserveOrder */, id,
-        false /* shared */,
+        this.owner.getMemberForStub(id, false), false /* shared */,
         startTime, ackTimeout, ackSATimeout);
     if (logger.isDebugEnabled()) {
       logger.debug("ConnectionTable: created an ordered connection: {}", result);
@@ -584,7 +583,7 @@ public class ConnectionTable  {
     
     ArrayList al = (ArrayList)this.threadConnectionMap.get(id);
     if (al == null) {
-      // First connection for this DistributedMember.  Make sure list for this
+      // First connection for this Stub.  Make sure list for this
       // stub is created if it isn't already there.
       al = new ArrayList();
       
@@ -652,7 +651,7 @@ public class ConnectionTable  {
   
   /**
    * Get a new connection
-   * @param id the DistributedMember on which to create the connection
+   * @param id the Stub on which to create the connection
    * @param preserveOrder whether order should be preserved
    * @param startTime the ms clock start time
    * @param ackTimeout the ms ack-wait-threshold, or zero
@@ -661,7 +660,7 @@ public class ConnectionTable  {
    * @throws java.io.IOException if the connection could not be created
    * @throws DistributedSystemDisconnectedException
    */
-  protected Connection get(DistributedMember id, boolean preserveOrder,
+  protected Connection get(Stub id, boolean preserveOrder,
       long startTime, long ackTimeout, long ackSATimeout) 
       throws java.io.IOException, DistributedSystemDisconnectedException
   {
@@ -839,38 +838,34 @@ public class ConnectionTable  {
   /**
    * Return true if our owner already knows that this endpoint is departing 
    */
-  protected boolean isEndpointShuttingDown(DistributedMember id) {
-    return giveUpOnMember(owner.getDM().getMembershipManager(), id);
+  protected boolean isEndpointShuttingDown(Stub stub) {
+    return this.owner.getMemberForStub(stub, true) == null;
   }
   
-  protected boolean giveUpOnMember(MembershipManager mgr, DistributedMember remoteAddr) {
-    return !mgr.memberExists(remoteAddr) || mgr.isShunned(remoteAddr) || mgr.shutdownInProgress();
-  }
-
   /** remove an endpoint and notify the membership manager of the departure */
-  protected void removeEndpoint(DistributedMember stub, String reason) {
+  protected void removeEndpoint(Stub stub, String reason) {
     removeEndpoint(stub, reason, true);
   }
 
-  protected void removeEndpoint(DistributedMember memberID, String reason, boolean notifyDisconnect) {
+  protected void removeEndpoint(Stub stub, String reason, boolean notifyDisconnect) {
     if (this.closed) {
       return;
     }
     boolean needsRemoval = false;
     synchronized (this.orderedConnectionMap) {
-      if (this.orderedConnectionMap.get(memberID) != null)
+      if (this.orderedConnectionMap.get(stub) != null)
         needsRemoval = true;
     }
     if (!needsRemoval) {
       synchronized (this.unorderedConnectionMap) {
-        if (this.unorderedConnectionMap.get(memberID) != null)
+        if (this.unorderedConnectionMap.get(stub) != null)
           needsRemoval = true;
       }
     }
     if (!needsRemoval) {
       ConcurrentMap cm = this.threadConnectionMap;
       if (cm != null) {
-        ArrayList al = (ArrayList)cm.get(memberID);
+        ArrayList al = (ArrayList)cm.get(stub);
         needsRemoval = al != null && al.size() > 0;
       }
     }
@@ -878,14 +873,14 @@ public class ConnectionTable  {
     if (needsRemoval) {
       InternalDistributedMember remoteAddress = null;
       synchronized (this.orderedConnectionMap) {
-        Object c = this.orderedConnectionMap.remove(memberID);
+        Object c = this.orderedConnectionMap.remove(stub);
         if (c instanceof Connection) {
           remoteAddress = ((Connection) c).getRemoteAddress();
         }
         closeCon(reason, c);
       }
       synchronized (this.unorderedConnectionMap) {
-        Object c = this.unorderedConnectionMap.remove(memberID);
+        Object c = this.unorderedConnectionMap.remove(stub);
         if (remoteAddress == null && (c instanceof Connection)) {
           remoteAddress = ((Connection) c).getRemoteAddress();
         }
@@ -895,7 +890,7 @@ public class ConnectionTable  {
       {
         ConcurrentMap cm = this.threadConnectionMap;
         if (cm != null) {
-          ArrayList al = (ArrayList)cm.remove(memberID);
+          ArrayList al = (ArrayList)cm.remove(stub);
           if (al != null) {
             synchronized (al) {
               for (Iterator it=al.iterator(); it.hasNext();) {
@@ -917,7 +912,7 @@ public class ConnectionTable  {
         for (Iterator it=connectingSockets.entrySet().iterator(); it.hasNext(); ) {
           Map.Entry entry = (Map.Entry)it.next();
           ConnectingSocketInfo info = (ConnectingSocketInfo)entry.getValue();
-          if (info.peerAddress.equals(((InternalDistributedMember)memberID).getInetAddress())) {
+          if (info.peerAddress.equals(stub.getInetAddress())) {
             toRemove.add(entry.getKey());
             it.remove();
           }
@@ -930,7 +925,7 @@ public class ConnectionTable  {
         }
         catch (IOException e) {
           if (logger.isDebugEnabled()) {
-            logger.debug("caught exception while trying to close connecting socket for {}", memberID, e);
+            logger.debug("caught exception while trying to close connecting socket for {}", stub, e);
           }
         }
       }
@@ -942,7 +937,7 @@ public class ConnectionTable  {
       synchronized (this.receivers) {
         for (Iterator it=receivers.iterator(); it.hasNext();) {
           Connection con = (Connection)it.next();
-          if (memberID.equals(con.getRemoteAddress())) {
+          if (stub.equals(con.getRemoteId())) {
             it.remove();
             toRemove.add(con);
           }
@@ -952,13 +947,10 @@ public class ConnectionTable  {
         Connection con = (Connection)it.next();
         closeCon(reason, con);
       }
+      // call memberDeparted after doing the closeCon calls
+      // so it can recursively call removeEndpoint
       if (notifyDisconnect) {
-        // Before the removal of TCPConduit Stub addresses this used
-        // to call MembershipManager.getMemberForStub, which checked
-        // for a shutdown in progress and threw this exception:
-        if (owner.getDM().shutdownInProgress()) {
-          throw new DistributedSystemDisconnectedException("Shutdown in progress", owner.getDM().getMembershipManager().getShutdownCause());
-        }
+        owner.getMemberForStub(stub, false);
       }
       
       if (remoteAddress != null) {
@@ -972,11 +964,11 @@ public class ConnectionTable  {
   }
   
   /** check to see if there are still any receiver threads for the given end-point */
-  protected boolean hasReceiversFor(DistributedMember endPoint) {
+  protected boolean hasReceiversFor(Stub endPoint) {
     synchronized (this.receivers) {
       for (Iterator it=receivers.iterator(); it.hasNext();) {
         Connection con = (Connection)it.next();
-        if (endPoint.equals(con.getRemoteAddress())) {
+        if (endPoint.equals(con.getRemoteId())) {
           return true;
         }
       }
@@ -984,7 +976,7 @@ public class ConnectionTable  {
     return false;
   }
   
-  private static void removeFromThreadConMap(ConcurrentMap cm, DistributedMember stub, Connection c) {
+  private static void removeFromThreadConMap(ConcurrentMap cm, Stub stub, Connection c) {
     if (cm != null) {
       ArrayList al = (ArrayList)cm.get(stub);
       if (al != null) {
@@ -994,7 +986,7 @@ public class ConnectionTable  {
       }
     }
   }
-  protected void removeThreadConnection(DistributedMember stub, Connection c) {
+  protected void removeThreadConnection(Stub stub, Connection c) {
     /*if (this.closed) {
       return;
     }*/
@@ -1009,7 +1001,7 @@ public class ConnectionTable  {
       } // synchronized
     } // m != null
   }
-  void removeSharedConnection(String reason, DistributedMember stub, boolean ordered, Connection c) {
+  void removeSharedConnection(String reason, Stub stub, boolean ordered, Connection c) {
     if (this.closed) {
       return;
     }
@@ -1062,7 +1054,7 @@ public class ConnectionTable  {
        Iterator it = m.entrySet().iterator();
        while (it.hasNext()) {
          Map.Entry me = (Map.Entry)it.next();
-         DistributedMember stub = (DistributedMember)me.getKey();
+         Stub stub = (Stub)me.getKey();
          Connection c = (Connection)me.getValue();
          removeFromThreadConMap(this.threadConnectionMap, stub, c);
          it.remove();
@@ -1087,7 +1079,7 @@ public class ConnectionTable  {
    * from being formed or new messages from being sent
    * @since 5.1
    */
-  protected void getThreadOwnedOrderedConnectionState(DistributedMember member,
+  protected void getThreadOwnedOrderedConnectionState(Stub member,
       Map result) {
 
     ConcurrentMap cm = this.threadConnectionMap;
@@ -1113,7 +1105,7 @@ public class ConnectionTable  {
    * wait for the given incoming connections to receive at least the associated
    * number of messages
    */
-  protected void waitForThreadOwnedOrderedConnectionState(DistributedMember member,
+  protected void waitForThreadOwnedOrderedConnectionState(Stub member,
       Map connectionStates) throws InterruptedException {
     if (Thread.interrupted()) throw new InterruptedException(); // wisest to do this before the synchronize below
     List r = null;
@@ -1123,14 +1115,14 @@ public class ConnectionTable  {
     for (Iterator it=r.iterator(); it.hasNext();) {
       Connection con = (Connection)it.next();
       if (!con.stopped && !con.isClosing() && !con.getOriginatedHere() && con.getPreserveOrder()
-          && member.equals(con.getRemoteAddress())) {
+          && member.equals(con.getRemoteId())) {
         Long state = (Long)connectionStates.remove(Long.valueOf(con.getUniqueId()));
         if (state != null) {
           long count = state.longValue();
           while (!con.stopped && !con.isClosing() && con.getMessagesReceived() < count) {
             if (logger.isDebugEnabled()) {
               logger.debug("Waiting for connection {}/{} currently={} need={}", 
-                  con.getRemoteAddress(), con.getUniqueId(), con.getMessagesReceived(), count);
+                  con.getRemoteId(), con.getUniqueId(), con.getMessagesReceived(), count);
             }
             Thread.sleep(100);
           }
@@ -1238,11 +1230,11 @@ public class ConnectionTable  {
     /**
      * the stub we are connecting to
      */
-    private final DistributedMember id;
+    private final Stub id;
     
     private final Thread connectingThread;
     
-    public PendingConnection(boolean preserveOrder, DistributedMember id) {
+    public PendingConnection(boolean preserveOrder, Stub id) {
       this.preserveOrder = preserveOrder;
       this.id = id;
       this.connectingThread = Thread.currentThread();
@@ -1287,9 +1279,10 @@ public class ConnectionTable  {
 
       boolean severeAlertIssued = false;
       boolean suspected = false;
-      DistributedMember targetMember = null;
+      InternalDistributedMember targetMember = null;
       if (ackSATimeout > 0) {
-        targetMember = this.id;
+        targetMember =
+          ((GMSMembershipManager)mgr).getMemberForStub(this.id, false);
       }
 
       for (;;) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/MemberShunnedException.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/MemberShunnedException.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/MemberShunnedException.java
index a954814..5cd426f 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/MemberShunnedException.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/MemberShunnedException.java
@@ -18,7 +18,6 @@
 package com.gemstone.gemfire.internal.tcp;
 
 import com.gemstone.gemfire.GemFireException;
-import com.gemstone.gemfire.distributed.DistributedMember;
 
 /**
  * MemberShunnedException may be thrown to prevent ack-ing a message
@@ -29,13 +28,13 @@ import com.gemstone.gemfire.distributed.DistributedMember;
 public class MemberShunnedException extends GemFireException
 {
   private static final long serialVersionUID = -8453126202477831557L;
-  private DistributedMember member;
+  private Stub member;
   
   /**
    * constructor
    * @param member the member that was shunned
    */
-  public MemberShunnedException(DistributedMember member) {
+  public MemberShunnedException(Stub member) {
     super("");
     this.member = member;
   }
@@ -43,7 +42,7 @@ public class MemberShunnedException extends GemFireException
   /**
    * @return the member that was shunned
    */
-  public DistributedMember getShunnedMember() {
+  public Stub getShunnedMember() {
     return this.member;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ServerDelegate.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ServerDelegate.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ServerDelegate.java
index cd711e7..fd495d9 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ServerDelegate.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ServerDelegate.java
@@ -16,7 +16,6 @@
  */
 package com.gemstone.gemfire.internal.tcp;
 
-import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.internal.DistributionMessage;
 import com.gemstone.gemfire.distributed.internal.membership.*;
 import com.gemstone.gemfire.i18n.LogWriterI18n;
@@ -35,7 +34,7 @@ import com.gemstone.gemfire.i18n.LogWriterI18n;
 public interface ServerDelegate {
 
   public void receive( DistributionMessage message, int bytesRead,
-                       DistributedMember connId );
+                       Stub connId );
 
   public LogWriterI18n getLogger();
 
@@ -43,5 +42,5 @@ public interface ServerDelegate {
    * Called when a possibly new member is detected by receiving a direct channel
    * message from him.
    */
-  public void newMemberConnected(InternalDistributedMember member);
+  public void newMemberConnected(InternalDistributedMember member, Stub id);
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Stub.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Stub.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Stub.java
new file mode 100644
index 0000000..2e4b91b
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Stub.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.tcp;
+
+import java.io.*;
+import java.net.*;
+
+import com.gemstone.gemfire.DataSerializable;
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.internal.InternalDataSerializer;
+
+/** Stub represents an ip address and port.
+
+    @author Bruce Schuchardt
+    @since 2.0
+   
+ */
+
+public class Stub implements Externalizable, DataSerializable
+{
+  private InetAddress inAddr;
+  private int port;
+  private int viewID;
+
+  public Stub() {
+    // public default needed for deserialization
+  }
+  
+  public Stub(InetAddress addr, int port, int vmViewID) {
+    viewID = vmViewID;
+    inAddr = addr;
+    this.port = port;
+  }
+  
+  @Override
+  public boolean equals(Object o) {
+    if (o == this) {
+      return true;
+    }
+    if (o instanceof Stub) {
+      Stub s = (Stub)o;
+      boolean result;
+      if (inAddr == null)
+        result = s.inAddr == null;
+      else
+        result = inAddr.equals(s.inAddr);
+      result = result && port == s.port;
+      if (this.viewID != 0 && s.viewID != 0) {
+        result = result && (this.viewID == s.viewID);
+      }
+      return result;
+    }
+    else {
+      return false;
+    }
+  }
+  
+  // hashCode equates to the address hashCode for fast connection lookup
+  @Override
+  public int hashCode() {
+    // do not use viewID in hashCode because it is changed after creating a stub
+    int result = 0;
+    // result += inAddr.hashCode(); // useless
+    result += port;
+    return result;
+  }
+  
+  public void setViewID(int viewID) {
+    this.viewID = viewID;
+  }
+  
+  public int getPort() {
+    return port;
+  }
+  
+  public int getViewID() {
+    return this.viewID;
+  }
+  
+  public InetAddress getInetAddress() {
+    return inAddr;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder(80);
+    sb.append("tcp://");
+    if (inAddr == null)
+      sb.append("<null>");
+    else
+      sb.append(inAddr.toString());
+    if (this.viewID != 0) {
+      sb.append("<v"+this.viewID+">");
+    }
+    sb.append(":" + port);
+    return sb.toString();
+  }
+  
+  /**
+   * Writes the contents of this <code>Stub</code> to a
+   * <code>DataOutput</code>. 
+   *
+   * @since 3.0
+   */
+  public void toData(DataOutput out) 
+    throws IOException
+  {
+    DataSerializer.writeInetAddress(inAddr, out);
+    out.writeInt(port);
+    out.writeInt(viewID);
+  }
+  
+  /**
+   * Reads the contents of this <code>Stub</code> from a
+   * <code>DataOutput</code>. 
+   *
+   * @since 3.0
+   */
+  public void fromData(DataInput in)
+    throws IOException, ClassNotFoundException
+  {
+    inAddr = DataSerializer.readInetAddress(in);
+    this.port = in.readInt();
+    this.viewID = in.readInt();
+  }
+
+  /**
+   * static factory method
+   * @since 5.0.2
+   */
+  public static Stub createFromData(DataInput in)
+    throws IOException, ClassNotFoundException
+  {
+    Stub result = new Stub();
+    InternalDataSerializer.invokeFromData(result, in);
+    return result;
+  }
+  
+  public void writeExternal(ObjectOutput os) 
+    throws IOException
+  {
+    this.toData(os);
+  }
+  
+  public void readExternal(ObjectInput is)
+    throws IOException, ClassNotFoundException
+  {
+    this.fromData(is);
+  }
+}


Mime
View raw message