geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [3/8] incubator-geode git commit: GEODE-420: Clean up of SocketCreator code in tests. SocketCreatorFactory currently singleton, to amend at later stage
Date Tue, 12 Jul 2016 20:01:02 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java
index ed570c1..966de54 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java
@@ -16,66 +16,90 @@
  */
 package com.gemstone.gemfire.internal.tcp;
 
+import java.io.IOException;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.net.ssl.SSLException;
+
+import org.apache.logging.log4j.Logger;
+
 import com.gemstone.gemfire.CancelCriterion;
 import com.gemstone.gemfire.CancelException;
 import com.gemstone.gemfire.SystemFailure;
 import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
-import com.gemstone.gemfire.distributed.internal.*;
+import com.gemstone.gemfire.distributed.internal.DM;
+import com.gemstone.gemfire.distributed.internal.DMStats;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.DistributionMessage;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.LonerDistributionManager;
 import com.gemstone.gemfire.distributed.internal.direct.DirectChannel;
 import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 import com.gemstone.gemfire.distributed.internal.membership.MembershipManager;
-import com.gemstone.gemfire.internal.SocketCreator;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
 import com.gemstone.gemfire.internal.logging.log4j.AlertAppender;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
-import org.apache.logging.log4j.Logger;
-
-import javax.net.ssl.SSLException;
-import java.io.IOException;
-import java.net.*;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.*;
-
-/** <p>TCPConduit manages a server socket and a collection of connections to
-    other systems.  Connections are identified by DistributedMember IDs.
-    These types of messages are currently supported:</p><pre>
-
-       DistributionMessage - message is delivered to the server's
-                             ServerDelegate
-
-    </pre>
-    <p>In the current implementation, ServerDelegate is the DirectChannel
-    used by the GemFire DistributionManager to send and receive messages.<p>
-    If the ServerDelegate is null, DistributionMessages are ignored by
-    the TCPConduit.</p>
-
-    @since GemFire 2.0
-   
-*/
+import com.gemstone.gemfire.internal.net.SocketCreator;
+import com.gemstone.gemfire.internal.net.SocketCreatorFactory;
+
+/**
+ * <p>TCPConduit manages a server socket and a collection of connections to
+ * other systems.  Connections are identified by DistributedMember IDs.
+ * These types of messages are currently supported:</p><pre>
+ * <p>
+ * DistributionMessage - message is delivered to the server's
+ * ServerDelegate
+ * <p>
+ * </pre>
+ * <p>In the current implementation, ServerDelegate is the DirectChannel
+ * used by the GemFire DistributionManager to send and receive messages.<p>
+ * If the ServerDelegate is null, DistributionMessages are ignored by
+ * the TCPConduit.</p>
+ * @since GemFire 2.0
+ */
 
 public class TCPConduit implements Runnable {
+
   private static final Logger logger = LogService.getLogger();
 
-  /** max amount of time (ms) to wait for listener threads to stop */
+  /**
+   * max amount of time (ms) to wait for listener threads to stop
+   */
   private static int LISTENER_CLOSE_TIMEOUT;
 
-  /** backlog is the "accept" backlog configuration parameter all
-      conduits server socket */
+  /**
+   * backlog is the "accept" backlog configuration parameter all
+   * conduits server socket
+   */
   private static int BACKLOG;
-  
-  /** use javax.net.ssl.SSLServerSocketFactory? */
+
+  /**
+   * use javax.net.ssl.SSLServerSocketFactory?
+   */
   static boolean useSSL;
 
-//   public final static boolean USE_SYNC_WRITES = Boolean.getBoolean("p2p.useSyncWrites");
+  //   public final static boolean USE_SYNC_WRITES = Boolean.getBoolean("p2p.useSyncWrites");
 
   /**
    * Force use of Sockets rather than SocketChannels (NIO).  Note from Bruce: due to
@@ -84,18 +108,22 @@ public class TCPConduit implements Runnable {
    */
   private static boolean USE_NIO;
 
-  /** use direct ByteBuffers instead of heap ByteBuffers for NIO operations */
+  /**
+   * use direct ByteBuffers instead of heap ByteBuffers for NIO operations
+   */
   static boolean useDirectBuffers;
-  
+
   private volatile boolean inhibitNewConnections;
 
-//  private transient DistributedMembershipListener messageReceiver;
-  
+  //  private transient DistributedMembershipListener messageReceiver;
+
   private MembershipManager membershipManager;
-  
-  /** true if NIO can be used for the server socket */
+
+  /**
+   * true if NIO can be used for the server socket
+   */
   private boolean useNIO;
-  
+
   static {
     init();
   }
@@ -107,7 +135,7 @@ public class TCPConduit implements Runnable {
   public static int getBackLog() {
     return BACKLOG;
   }
-  
+
   public static void init() {
     useSSL = Boolean.getBoolean("p2p.useSSL");
     // only use nio if not SSL
@@ -116,33 +144,45 @@ public class TCPConduit implements Runnable {
     useDirectBuffers = USE_NIO && !Boolean.getBoolean("p2p.nodirectBuffers");
     LISTENER_CLOSE_TIMEOUT = Integer.getInteger("p2p.listenerCloseTimeout", 60000).intValue();
     // fix for bug 37730
-    BACKLOG = Integer.getInteger("p2p.backlog", HANDSHAKE_POOL_SIZE+1).intValue();
+    BACKLOG = Integer.getInteger("p2p.backlog", HANDSHAKE_POOL_SIZE + 1).intValue();
   }
 
   ///////////////// permanent conduit state
 
-  /** the size of OS TCP/IP buffers, not set by default */
+  /**
+   * the size of OS TCP/IP buffers, not set by default
+   */
   public int tcpBufferSize = DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
   public int idleConnectionTimeout = DistributionConfig.DEFAULT_SOCKET_LEASE_TIME;
 
-  /** port is the tcp/ip port that this conduit binds to. If it is zero, a port
-      from membership-port-range is selected to bind to. The actual port number this
-      conduit is listening on will be in the "id" instance variable */
+  /**
+   * port is the tcp/ip port that this conduit binds to. If it is zero, a port
+   * from membership-port-range is selected to bind to. The actual port number this
+   * conduit is listening on will be in the "id" instance variable
+   */
   private int port;
 
   private int[] tcpPortRange = new int[] { 1024, 65535 };
 
-  /** The java groups address that this conduit is associated with */
+  /**
+   * The java groups address that this conduit is associated with
+   */
   private InternalDistributedMember localAddr;
-  
-  /** address is the InetAddress that this conduit uses for identity */
+
+  /**
+   * address is the InetAddress that this conduit uses for identity
+   */
   private final InetAddress address;
-  
-  /** isBindAddress is true if we should bind to the address */
+
+  /**
+   * isBindAddress is true if we should bind to the address
+   */
   private final boolean isBindAddress;
 
-  /** the object that receives DistributionMessage messages
-      received by this conduit. */
+  /**
+   * the object that receives DistributionMessage messages
+   * received by this conduit.
+   */
   private final DirectChannel directChannel;
   /**
    * Stats from the delegate
@@ -154,42 +194,49 @@ public class TCPConduit implements Runnable {
    * @since GemFire 4.2.1
    */
   DistributionConfig config;
-  
+
   ////////////////// runtime state that is re-initialized on a restart
 
-  /** server socket address */
+  /**
+   * server socket address
+   */
   private InetSocketAddress id;
 
   protected volatile boolean stopped;
 
-  /** the listener thread */
+  /**
+   * the listener thread
+   */
   private Thread thread;
 
-  /** if using NIO, this is the object used for accepting connections */
+  /**
+   * if using NIO, this is the object used for accepting connections
+   */
   private ServerSocketChannel channel;
 
-  /** the server socket */
+  /**
+   * the server socket
+   */
   private ServerSocket socket;
 
-  /** a table of Connections from this conduit to others
+  /**
+   * a table of Connections from this conduit to others
    */
   private ConnectionTable conTable;
 
-  /** <p>creates a new TCPConduit bound to the given InetAddress and port.
-      The given ServerDelegate will receive any DistributionMessages
-      passed to the conduit.</p>
-      <p>This constructor forces the conduit to ignore the following
-      system properties and look for them only in the <i>props</i> argument:</p>
-      <pre>
-      p2p.tcpBufferSize
-      p2p.idleConnectionTimeout
-      </pre>
-  */
-  public TCPConduit(MembershipManager mgr, int port,
-      InetAddress address, boolean isBindAddress,
-      DirectChannel receiver, Properties props)
-    throws ConnectionException
-  {
+  /**
+   * <p>creates a new TCPConduit bound to the given InetAddress and port.
+   * The given ServerDelegate will receive any DistributionMessages
+   * passed to the conduit.</p>
+   * <p>This constructor forces the conduit to ignore the following
+   * system properties and look for them only in the <i>props</i> argument:</p>
+   * <pre>
+   * p2p.tcpBufferSize
+   * p2p.idleConnectionTimeout
+   * </pre>
+   */
+  public TCPConduit(MembershipManager mgr, int port, InetAddress address, boolean isBindAddress, DirectChannel receiver, Properties props)
+    throws ConnectionException {
     parseProperties(props);
 
     this.address = address;
@@ -209,8 +256,7 @@ public class TCPConduit implements Runnable {
 
     try {
       this.conTable = ConnectionTable.create(this);
-    }
-    catch (IOException io) {
+    } catch (IOException io) {
       throw new ConnectionException(LocalizedStrings.TCPConduit_UNABLE_TO_INITIALIZE_CONNECTION_TABLE.toLocalizedString(), io);
     }
     this.useNIO = USE_NIO;
@@ -219,8 +265,7 @@ public class TCPConduit implements Runnable {
       if (addr == null) {
         try {
           addr = SocketCreator.getLocalHost();
-        }
-        catch (java.net.UnknownHostException e) {
+        } catch (java.net.UnknownHostException e) {
           throw new ConnectionException("Unable to resolve localHost address", e);
         }
       }
@@ -234,96 +279,109 @@ public class TCPConduit implements Runnable {
         }
       }
     }
-      
+
     startAcceptor();
   }
 
 
-  /** parse instance-level properties from the given object */
+  /**
+   * parse instance-level properties from the given object
+   */
   private void parseProperties(Properties p) {
     if (p != null) {
       String s;
-      s = p.getProperty("p2p.tcpBufferSize", ""+tcpBufferSize);
-      try { tcpBufferSize = Integer.parseInt(s); } catch (Exception e) { logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_EXCEPTION_PARSING_P2PTCPBUFFERSIZE), e); }
+      s = p.getProperty("p2p.tcpBufferSize", "" + tcpBufferSize);
+      try {
+        tcpBufferSize = Integer.parseInt(s);
+      } catch (Exception e) {
+        logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_EXCEPTION_PARSING_P2PTCPBUFFERSIZE), e);
+      }
       if (tcpBufferSize < Connection.SMALL_BUFFER_SIZE) {
         // enforce minimum
         tcpBufferSize = Connection.SMALL_BUFFER_SIZE;
       }
-      s = p.getProperty("p2p.idleConnectionTimeout", ""+idleConnectionTimeout);
-      try { idleConnectionTimeout = Integer.parseInt(s); } catch (Exception e) { logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_EXCEPTION_PARSING_P2PIDLECONNECTIONTIMEOUT), e); }
-      
+      s = p.getProperty("p2p.idleConnectionTimeout", "" + idleConnectionTimeout);
+      try {
+        idleConnectionTimeout = Integer.parseInt(s);
+      } catch (Exception e) {
+        logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_EXCEPTION_PARSING_P2PIDLECONNECTIONTIMEOUT), e);
+      }
+
       s = p.getProperty("membership_port_range_start");
-      try { tcpPortRange[0] = Integer.parseInt(s); } catch (Exception e) { logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_EXCEPTION_PARSING_TCPPORTRANGESTART), e); }
-      
+      try {
+        tcpPortRange[0] = Integer.parseInt(s);
+      } catch (Exception e) {
+        logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_EXCEPTION_PARSING_TCPPORTRANGESTART), e);
+      }
+
       s = p.getProperty("membership_port_range_end");
-      try { tcpPortRange[1] = Integer.parseInt(s); } catch (Exception e) { logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_EXCEPTION_PARSING_TCPPORTRANGEEND), e); }
-      
+      try {
+        tcpPortRange[1] = Integer.parseInt(s);
+      } catch (Exception e) {
+        logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_EXCEPTION_PARSING_TCPPORTRANGEEND), e);
+      }
+
     }
   }
 
   private ThreadPoolExecutor hsPool;
 
-  /** the reason for a shutdown, if abnormal */
+  /**
+   * the reason for a shutdown, if abnormal
+   */
   private volatile Exception shutdownCause;
 
   private final static int HANDSHAKE_POOL_SIZE = Integer.getInteger("p2p.HANDSHAKE_POOL_SIZE", 10).intValue();
   private final static long HANDSHAKE_POOL_KEEP_ALIVE_TIME = Long.getLong("p2p.HANDSHAKE_POOL_KEEP_ALIVE_TIME", 60).longValue();
 
-  /** added to fix bug 40436 */
+  /**
+   * added to fix bug 40436
+   */
   public void setMaximumHandshakePoolSize(int maxSize) {
     if (this.hsPool != null && maxSize > HANDSHAKE_POOL_SIZE) {
       this.hsPool.setMaximumPoolSize(maxSize);
     }
   }
 
-  /** binds the server socket and gets threads going
-   *
-   * */
+  /**
+   * binds the server socket and gets threads going
+   */
   private void startAcceptor() throws ConnectionException {
     int localPort;
     int p = this.port;
     InetAddress ba = this.address;
-    
+
     {
       ThreadPoolExecutor tmp_hsPool = null;
       String gName = "P2P-Handshaker " + ba + ":" + p;
-      final ThreadGroup socketThreadGroup
-        = LoggingThreadGroup.createThreadGroup(gName, logger);
-                                          
+      final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
+
       ThreadFactory socketThreadFactory = new ThreadFactory() {
-          int connNum = -1;
+        int connNum = -1;
 
-          public Thread newThread(Runnable command) {
-            int tnum;
-            synchronized (this) {
-              tnum = ++connNum;
-            }
-            String tName = socketThreadGroup.getName() + " Thread " + tnum;
-            return new Thread(socketThreadGroup, command, tName);
+        public Thread newThread(Runnable command) {
+          int tnum;
+          synchronized (this) {
+            tnum = ++connNum;
           }
-        };
+          String tName = socketThreadGroup.getName() + " Thread " + tnum;
+          return new Thread(socketThreadGroup, command, tName);
+        }
+      };
       try {
         final BlockingQueue bq = new SynchronousQueue();
         final RejectedExecutionHandler reh = new RejectedExecutionHandler() {
-            public void rejectedExecution(Runnable r, ThreadPoolExecutor pool) {
-              try {
-                bq.put(r);
-              }
-              catch (InterruptedException ex) {
-                Thread.currentThread().interrupt(); // preserve the state
-                throw new RejectedExecutionException(LocalizedStrings.TCPConduit_INTERRUPTED.toLocalizedString(), ex);
-              }
+          public void rejectedExecution(Runnable r, ThreadPoolExecutor pool) {
+            try {
+              bq.put(r);
+            } catch (InterruptedException ex) {
+              Thread.currentThread().interrupt(); // preserve the state
+              throw new RejectedExecutionException(LocalizedStrings.TCPConduit_INTERRUPTED.toLocalizedString(), ex);
             }
-          };
-        tmp_hsPool = new ThreadPoolExecutor(1,
-                                            HANDSHAKE_POOL_SIZE,
-                                            HANDSHAKE_POOL_KEEP_ALIVE_TIME,
-                                            TimeUnit.SECONDS,
-                                            bq,
-                                            socketThreadFactory,
-                                            reh);
-      }
-      catch (IllegalArgumentException poolInitException) {
+          }
+        };
+        tmp_hsPool = new ThreadPoolExecutor(1, HANDSHAKE_POOL_SIZE, HANDSHAKE_POOL_KEEP_ALIVE_TIME, TimeUnit.SECONDS, bq, socketThreadFactory, reh);
+      } catch (IllegalArgumentException poolInitException) {
         throw new ConnectionException(LocalizedStrings.TCPConduit_WHILE_CREATING_HANDSHAKE_POOL.toLocalizedString(), poolInitException);
       }
       this.hsPool = tmp_hsPool;
@@ -334,44 +392,43 @@ public class TCPConduit implements Runnable {
 
       id = new InetSocketAddress(socket.getInetAddress(), localPort);
       stopped = false;
-      ThreadGroup group =
-        LoggingThreadGroup.createThreadGroup("P2P Listener Threads", logger);
+      ThreadGroup group = LoggingThreadGroup.createThreadGroup("P2P Listener Threads", logger);
       thread = new Thread(group, this, "P2P Listener Thread " + id);
       thread.setDaemon(true);
-      try { thread.setPriority(thread.getThreadGroup().getMaxPriority()); }
-      catch (Exception e) {
+      try {
+        thread.setPriority(thread.getThreadGroup().getMaxPriority());
+      } catch (Exception e) {
         logger.info(LocalizedMessage.create(LocalizedStrings.TCPConduit_UNABLE_TO_SET_LISTENER_PRIORITY__0, e.getMessage()));
       }
       if (!Boolean.getBoolean("p2p.test.inhibitAcceptor")) {
         thread.start();
-      }
-      else {
+      } else {
         logger.fatal(LocalizedMessage.create(LocalizedStrings.TCPConduit_INHIBITACCEPTOR));
         socket.close();
         this.hsPool.shutdownNow();
       }
-    }
-    catch (IOException io) {
+    } catch (IOException io) {
       String s = "While creating ServerSocket on port " + p;
       throw new ConnectionException(s, io);
     }
     this.port = localPort;
   }
-  
-  /** creates the server sockets.  This can be used to recreate the
-   *  socket using this.port and this.bindAddress, which must be set
-   *  before invoking this method.
+
+  /**
+   * creates the server sockets.  This can be used to recreate the
+   * socket using this.port and this.bindAddress, which must be set
+   * before invoking this method.
    */
   private void createServerSocket() {
     int p = this.port;
     int b = BACKLOG;
     InetAddress ba = this.address;
-    
+
     try {
       if (this.useNIO) {
         if (p <= 0) {
-          socket = SocketCreator.getDefaultInstance().createServerSocketUsingPortRange(ba, b, isBindAddress,
-                    this.useNIO, 0, tcpPortRange);
+
+          socket = SocketCreatorFactory.getClusterSSLSocketCreator().createServerSocketUsingPortRange(ba, b, isBindAddress, this.useNIO, 0, tcpPortRange);
         } else {
           ServerSocketChannel channl = ServerSocketChannel.open();
           socket = channl.socket();
@@ -387,92 +444,91 @@ public class TCPConduit implements Runnable {
             socket.setReceiveBufferSize(tcpBufferSize);
             int newSize = socket.getReceiveBufferSize();
             if (newSize != tcpBufferSize) {
-              logger.info(LocalizedMessage.create(LocalizedStrings.TCPConduit_0_IS_1_INSTEAD_OF_THE_REQUESTED_2,
-                  new Object[] {"Listener receiverBufferSize", Integer.valueOf(newSize), Integer.valueOf(tcpBufferSize)}));
+              logger.info(LocalizedMessage.create(LocalizedStrings.TCPConduit_0_IS_1_INSTEAD_OF_THE_REQUESTED_2, new Object[] {
+                "Listener receiverBufferSize",
+                Integer.valueOf(newSize),
+                Integer.valueOf(tcpBufferSize)
+              }));
             }
           } catch (SocketException ex) {
             logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_FAILED_TO_SET_LISTENER_RECEIVERBUFFERSIZE_TO__0, tcpBufferSize));
           }
         }
         channel = socket.getChannel();
-      }
-      else {
+      } else {
         try {
           if (p <= 0) {
-            socket = SocketCreator.getDefaultInstance().createServerSocketUsingPortRange(ba, b, isBindAddress,
-                      this.useNIO, this.tcpBufferSize, tcpPortRange);
+            socket = SocketCreatorFactory.getClusterSSLSocketCreator()
+                                         .createServerSocketUsingPortRange(ba, b, isBindAddress, this.useNIO, this.tcpBufferSize, tcpPortRange);
           } else {
-            socket = SocketCreator.getDefaultInstance().createServerSocket(p, b, isBindAddress? ba : null, this.tcpBufferSize);
+            socket = SocketCreatorFactory.getClusterSSLSocketCreator().createServerSocket(p, b, isBindAddress ? ba : null, this.tcpBufferSize);
           }
           int newSize = socket.getReceiveBufferSize();
           if (newSize != this.tcpBufferSize) {
-            logger.info(LocalizedMessage.create(LocalizedStrings.TCPConduit_0_IS_1_INSTEAD_OF_THE_REQUESTED_2,
-                new Object[] {"Listener receiverBufferSize", Integer.valueOf(newSize), Integer.valueOf(this.tcpBufferSize)}));
+            logger.info(LocalizedMessage.create(LocalizedStrings.TCPConduit_0_IS_1_INSTEAD_OF_THE_REQUESTED_2, new Object[] {
+              "Listener receiverBufferSize",
+              Integer.valueOf(newSize),
+              Integer.valueOf(this.tcpBufferSize)
+            }));
           }
         } catch (SocketException ex) {
           logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_FAILED_TO_SET_LISTENER_RECEIVERBUFFERSIZE_TO__0, this.tcpBufferSize));
-          
+
         }
       }
       port = socket.getLocalPort();
-    }
-    catch (IOException io) {
-      throw new ConnectionException( LocalizedStrings.TCPConduit_EXCEPTION_CREATING_SERVERSOCKET.toLocalizedString(
-              new Object[] {Integer.valueOf(p), ba}), io);
+    } catch (IOException io) {
+      throw new ConnectionException(LocalizedStrings.TCPConduit_EXCEPTION_CREATING_SERVERSOCKET.toLocalizedString(new Object[] { Integer.valueOf(p), ba }), io);
     }
   }
 
   /**
    * Ensure that the ConnectionTable class gets loaded.
-   * 
    * @see SystemFailure#loadEmergencyClasses()
    */
   public static void loadEmergencyClasses() {
     ConnectionTable.loadEmergencyClasses();
   }
-  
+
   /**
    * Close the ServerSocketChannel, ServerSocket, and the
    * ConnectionTable.
-   * 
    * @see SystemFailure#emergencyClose()
    */
   public void emergencyClose() {
-//    stop(); // Causes grief
+    //    stop(); // Causes grief
     if (stopped) {
       return;
     }
-    
+
     stopped = true;
 
-//    System.err.println("DEBUG: TCPConduit emergencyClose");
+    //    System.err.println("DEBUG: TCPConduit emergencyClose");
     try {
       if (channel != null) {
         channel.close();
         // NOTE: do not try to interrupt the listener thread at this point.
         // Doing so interferes with the channel's socket logic.
-      }
-      else {
+      } else {
         if (socket != null) {
           socket.close();
         }
       }
-    }
-    catch (IOException e) {
+    } catch (IOException e) {
       // ignore, please!
     }
 
     // this.hsPool.shutdownNow(); // I don't trust this not to allocate objects or to synchronize
-//  this.conTable.close(); not safe against deadlocks
+    //  this.conTable.close(); not safe against deadlocks
     ConnectionTable.emergencyClose();
-    
+
     socket = null;
     thread = null;
     conTable = null;
-    
-//    System.err.println("DEBUG: end of TCPConduit emergencyClose");
+
+    //    System.err.println("DEBUG: end of TCPConduit emergencyClose");
   }
-  
+
   /* stops the conduit, closing all tcp/ip connections */
   public void stop(Exception cause) {
     if (!stopped) {
@@ -486,13 +542,13 @@ public class TCPConduit implements Runnable {
         // set timeout endpoint here since interrupt() has been known
         // to hang
         long timeout = System.currentTimeMillis() + LISTENER_CLOSE_TIMEOUT;
-        Thread t = this.thread;;
+        Thread t = this.thread;
+        ;
         if (channel != null) {
           channel.close();
           // NOTE: do not try to interrupt the listener thread at this point.
           // Doing so interferes with the channel's socket logic.
-        }
-        else {
+        } else {
           ServerSocket s = this.socket;
           if (s != null) {
             s.close();
@@ -501,7 +557,7 @@ public class TCPConduit implements Runnable {
             t.interrupt();
           }
         }
-        
+
         do {
           t = this.thread;
           if (t == null || !t.isAlive()) {
@@ -511,15 +567,13 @@ public class TCPConduit implements Runnable {
         } while (timeout > System.currentTimeMillis());
 
         if (t != null && t.isAlive()) {
-          logger.warn(LocalizedMessage.create(
-              LocalizedStrings.TCPConduit_UNABLE_TO_SHUT_DOWN_LISTENER_WITHIN_0_MS_UNABLE_TO_INTERRUPT_SOCKET_ACCEPT_DUE_TO_JDK_BUG_GIVING_UP,
-              Integer.valueOf(LISTENER_CLOSE_TIMEOUT)));
+          logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_UNABLE_TO_SHUT_DOWN_LISTENER_WITHIN_0_MS_UNABLE_TO_INTERRUPT_SOCKET_ACCEPT_DUE_TO_JDK_BUG_GIVING_UP, Integer
+            .valueOf(LISTENER_CLOSE_TIMEOUT)));
         }
       } catch (IOException e) {
       } catch (InterruptedException e) {
         // Ignore, we're trying to stop already.
-      }
-      finally {
+      } finally {
         this.hsPool.shutdownNow();
       }
 
@@ -534,18 +588,20 @@ public class TCPConduit implements Runnable {
 
   /**
    * Returns whether or not this conduit is stopped
-   *
    * @since GemFire 3.0
    */
   public boolean isStopped() {
     return this.stopped;
   }
 
-  /** starts the conduit again after it's been stopped.  This will clear the
-      server map if the conduit's port is zero (wildcard bind) */
+  /**
+   * starts the conduit again after it's been stopped.  This will clear the
+   * server map if the conduit's port is zero (wildcard bind)
+   */
   public void restart() throws ConnectionException {
-    if (!stopped)
+    if (!stopped) {
       return;
+    }
     this.stats = null;
     if (directChannel != null) {
       this.stats = directChannel.getDMStats();
@@ -555,20 +611,21 @@ public class TCPConduit implements Runnable {
     }
     try {
       this.conTable = ConnectionTable.create(this);
-    }
-    catch (IOException io) {
+    } catch (IOException io) {
       throw new ConnectionException(LocalizedStrings.TCPConduit_UNABLE_TO_INITIALIZE_CONNECTION_TABLE.toLocalizedString(), io);
     }
     startAcceptor();
   }
 
-  /** this is the server socket listener thread's run loop */
+  /**
+   * this is the server socket listener thread's run loop
+   */
   public void run() {
     ConnectionTable.threadWantsSharedResources();
     if (logger.isTraceEnabled(LogMarker.DM)) {
       logger.trace(LogMarker.DM, "Starting P2P Listener on  {}", id);
     }
-    for(;;) {
+    for (; ; ) {
       SystemFailure.checkFailure();
       if (stopper.cancelInProgress() != null) {
         break;
@@ -586,49 +643,42 @@ public class TCPConduit implements Runnable {
       Socket othersock = null;
       try {
         if (this.useNIO) {
-          SocketChannel otherChannel = channel.accept();          
+          SocketChannel otherChannel = channel.accept();
           othersock = otherChannel.socket();
-        }
-        else {
+        } else {
           try {
             othersock = socket.accept();
-          }
-          catch (SSLException ex) {
+          } catch (SSLException ex) {
             // SW: This is the case when there is a problem in P2P
             // SSL configuration, so need to exit otherwise goes into an
             // infinite loop just filling the logs
-            logger.warn(LocalizedMessage.create(
-                LocalizedStrings.TCPConduit_STOPPING_P2P_LISTENER_DUE_TO_SSL_CONFIGURATION_PROBLEM), ex);
+            logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_STOPPING_P2P_LISTENER_DUE_TO_SSL_CONFIGURATION_PROBLEM), ex);
             break;
           }
-          SocketCreator.getDefaultInstance().configureServerSSLSocket(
-              othersock);
+          SocketCreatorFactory.getClusterSSLSocketCreator().configureServerSSLSocket(othersock);
         }
         if (stopped) {
           try {
             if (othersock != null) {
               othersock.close();
             }
-          }
-          catch (Exception e) {
+          } catch (Exception e) {
           }
           continue;
         }
         if (inhibitNewConnections) {
-//          if (logger.isTraceEnabled(LogMarker.QA)) {
-            logger.info("Test hook: inhibiting acceptance of connection {}", othersock);
-//          }
+          //          if (logger.isTraceEnabled(LogMarker.QA)) {
+          logger.info("Test hook: inhibiting acceptance of connection {}", othersock);
+          //          }
           othersock.close();
           while (inhibitNewConnections && !stopped) {
             this.stopper.checkCancelInProgress(null);
             boolean interrupted = Thread.interrupted();
-            try { 
-              Thread.sleep(2000); 
-            }
-            catch (InterruptedException e) {
+            try {
+              Thread.sleep(2000);
+            } catch (InterruptedException e) {
               interrupted = true;
-            }
-            finally {
+            } finally {
               if (interrupted) {
                 Thread.currentThread().interrupt();
               }
@@ -637,41 +687,32 @@ public class TCPConduit implements Runnable {
           if (logger.isTraceEnabled(LogMarker.QA)) {
             logger.trace(LogMarker.QA, "Test hook: finished inhibiting acceptance of connections");
           }
-        }
-        else {
+        } else {
           acceptConnection(othersock);
         }
-      }
-      catch (ClosedByInterruptException cbie) {
+      } catch (ClosedByInterruptException cbie) {
         //safe to ignore
-      }
-      catch (ClosedChannelException e) {
+      } catch (ClosedChannelException e) {
         break; // we're dead
-      }
-      catch (CancelException e) {
+      } catch (CancelException e) {
         break;
-      }
-      catch (Exception e) {
+      } catch (Exception e) {
         if (!stopped) {
           if (e instanceof SocketException && "Socket closed".equalsIgnoreCase(e.getMessage())) {
             // safe to ignore; see bug 31156
             if (!socket.isClosed()) {
-              logger.warn(LocalizedMessage.create(
-                  LocalizedStrings.TCPConduit_SERVERSOCKET_THREW_SOCKET_CLOSED_EXCEPTION_BUT_SAYS_IT_IS_NOT_CLOSED), e);
+              logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_SERVERSOCKET_THREW_SOCKET_CLOSED_EXCEPTION_BUT_SAYS_IT_IS_NOT_CLOSED), e);
               try {
                 socket.close();
                 createServerSocket();
-              }
-              catch (IOException ioe) {
+              } catch (IOException ioe) {
                 logger.fatal(LocalizedMessage.create(LocalizedStrings.TCPConduit_UNABLE_TO_CLOSE_AND_RECREATE_SERVER_SOCKET), ioe);
                 // post 5.1.0x, this should force shutdown
                 try {
                   Thread.sleep(5000);
-                }
-                catch (InterruptedException ie) {
+                } catch (InterruptedException ie) {
                   // Don't reset; we're just exiting the thread
-                  logger.info(LocalizedMessage.create(
-                      LocalizedStrings.TCPConduit_INTERRUPTED_AND_EXITING_WHILE_TRYING_TO_RECREATE_LISTENER_SOCKETS));
+                  logger.info(LocalizedMessage.create(LocalizedStrings.TCPConduit_INTERRUPTED_AND_EXITING_WHILE_TRYING_TO_RECREATE_LISTENER_SOCKETS));
                   return;
                 }
               }
@@ -693,8 +734,7 @@ public class TCPConduit implements Runnable {
         logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_SERVERSOCKET_CLOSED_REOPENING));
         try {
           createServerSocket();
-        }
-        catch (ConnectionException ex) {
+        } catch (ConnectionException ex) {
           logger.warn(ex.getMessage(), ex);
         }
       }
@@ -708,19 +748,18 @@ public class TCPConduit implements Runnable {
   private void acceptConnection(final Socket othersock) {
     try {
       this.hsPool.execute(new Runnable() {
-          public void run() {
-            basicAcceptConnection(othersock);
-          }
-        });
-    }
-    catch (RejectedExecutionException rejected) {
+        public void run() {
+          basicAcceptConnection(othersock);
+        }
+      });
+    } catch (RejectedExecutionException rejected) {
       try {
         othersock.close();
-      }
-      catch (IOException ignore) {
+      } catch (IOException ignore) {
       }
     }
   }
+
   private ConnectionTable getConTable() {
     ConnectionTable result = this.conTable;
     if (result == null) {
@@ -729,41 +768,40 @@ public class TCPConduit implements Runnable {
     }
     return result;
   }
+
   protected void basicAcceptConnection(Socket othersock) {
     try {
       getConTable().acceptConnection(othersock);
-    }
-    catch (IOException io) {
+    } catch (IOException io) {
       // exception is logged by the Connection
       if (!stopped) {
         this.stats.incFailedAccept();
       }
-    }
-    catch (ConnectionException ex) {
+    } catch (ConnectionException ex) {
       // exception is logged by the Connection
       if (!stopped) {
         this.stats.incFailedAccept();
       }
-    }
-    catch (CancelException e) {
-    }
-    catch (Exception e) {
+    } catch (CancelException e) {
+    } catch (Exception e) {
       if (!stopped) {
-//        if (e instanceof SocketException
-//            && "Socket closed".equals(e.getMessage())) {
-//          // safe to ignore; see bug 31156
-//        } 
-//        else 
+        //        if (e instanceof SocketException
+        //            && "Socket closed".equals(e.getMessage())) {
+        //          // safe to ignore; see bug 31156
+        //        }
+        //        else
         {
           this.stats.incFailedAccept();
-          logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_FAILED_TO_ACCEPT_CONNECTION_FROM_0_BECAUSE_1,
-            new Object[] {othersock.getInetAddress(), e}), e);
+          logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_FAILED_TO_ACCEPT_CONNECTION_FROM_0_BECAUSE_1, new Object[] {
+            othersock.getInetAddress(),
+            e
+          }), e);
         }
       }
       //connections.cleanupLowWater();
     }
   }
-  
+
   /**
    * return true if "new IO" classes are being used for the server socket
    */
@@ -776,22 +814,17 @@ public class TCPConduit implements Runnable {
    * ordered connections
    * @since GemFire 5.1
    */
-  public void getThreadOwnedOrderedConnectionState(
-    DistributedMember member,
-    Map result)
-  {
+  public void getThreadOwnedOrderedConnectionState(DistributedMember member, Map result) {
     getConTable().getThreadOwnedOrderedConnectionState(member, result);
   }
-  
+
   /**
    * wait for the incoming connections identified by the keys in the
    * argument to receive and dispatch the number of messages associated
    * with the key
    * @since GemFire 5.1
    */
-  public void waitForThreadOwnedOrderedConnectionState(DistributedMember member, Map channelState)
-    throws InterruptedException
-  {
+  public void waitForThreadOwnedOrderedConnectionState(DistributedMember member, Map channelState) throws InterruptedException {
     // if (Thread.interrupted()) throw new InterruptedException(); not necessary done in waitForThreadOwnedOrderedConnectionState
     getConTable().waitForThreadOwnedOrderedConnectionState(member, channelState);
   }
@@ -800,9 +833,8 @@ public class TCPConduit implements Runnable {
    * connections send messageReceived when a message object has been
    * read.
    * @param bytesRead number of bytes read off of network to get this message
-  */
-  protected void messageReceived(Connection receiver, DistributionMessage message,
-                                 int bytesRead) {
+   */
+  protected void messageReceived(Connection receiver, DistributionMessage message, int bytesRead) {
     if (logger.isTraceEnabled()) {
       logger.trace("{} received {} from {}", id, message, receiver);
     }
@@ -816,12 +848,16 @@ public class TCPConduit implements Runnable {
     }
   }
 
-  /** gets the address of this conduit's ServerSocket endpoint */
+  /**
+   * gets the address of this conduit's ServerSocket endpoint
+   */
   public InetSocketAddress getId() {
     return id;
   }
 
-  /** gets the actual port to which this conduit's ServerSocket is bound */
+  /**
+   * gets the actual port to which this conduit's ServerSocket is bound
+   */
   public int getPort() {
     return id.getPort();
   }
@@ -832,14 +868,19 @@ public class TCPConduit implements Runnable {
   public InternalDistributedMember getLocalAddress() {
     return this.localAddr;
   }
-  /** gets the requested port that this TCPConduit bound to.  This could
-      be zero if a wildcard bind was done */
+
+  /**
+   * gets the requested port that this TCPConduit bound to.  This could
+   * be zero if a wildcard bind was done
+   */
   public int getBindPort() {
     return port;
   }
 
-  
-  /** gets the channel that is used to process non-DistributedMember messages */
+
+  /**
+   * gets the channel that is used to process non-DistributedMember messages
+   */
   public DirectChannel getDirectChannel() {
     return directChannel;
   }
@@ -847,7 +888,7 @@ public class TCPConduit implements Runnable {
   public void setLocalAddr(InternalDistributedMember addr) {
     localAddr = addr;
   }
-  
+
   public InternalDistributedMember getLocalAddr() {
     return localAddr;
   }
@@ -856,19 +897,21 @@ public class TCPConduit implements Runnable {
    * Return a connection to the given member.   This method must continue
    * to attempt to create a connection to the given member as long as that
    * member is in the membership view and the system is not shutting down.
-   * 
    * @param memberAddress the IDS associated with the remoteId
    * @param preserveOrder whether this is an ordered or unordered connection
    * @param retry false if this is the first attempt
    * @param startTime the time this operation started
    * @param ackTimeout the ack-wait-threshold * 1000 for the operation to be transmitted (or zero)
    * @param ackSATimeout the ack-severe-alert-threshold * 1000 for the operation to be transmitted (or zero)
+   *
    * @return the connection
    */
-  public Connection getConnection(InternalDistributedMember memberAddress, final boolean preserveOrder, boolean retry, long startTime,
-      long ackTimeout, long ackSATimeout)
-    throws java.io.IOException, DistributedSystemDisconnectedException
-  {
+  public Connection getConnection(InternalDistributedMember memberAddress,
+                                  final boolean preserveOrder,
+                                  boolean retry,
+                                  long startTime,
+                                  long ackTimeout,
+                                  long ackSATimeout) throws java.io.IOException, DistributedSystemDisconnectedException {
     //final boolean preserveOrder = (processorType == DistributionManager.SERIAL_EXECUTOR )|| (processorType == DistributionManager.PARTITIONED_REGION_EXECUTOR);
     if (stopped) {
       throw new DistributedSystemDisconnectedException(LocalizedStrings.TCPConduit_THE_CONDUIT_IS_STOPPED.toLocalizedString());
@@ -877,181 +920,166 @@ public class TCPConduit implements Runnable {
     Connection conn = null;
     InternalDistributedMember memberInTrouble = null;
     boolean breakLoop = false;
-    for (;;) {
+    for (; ; ) {
       stopper.checkCancelInProgress(null);
       boolean interrupted = Thread.interrupted();
       try {
-      // If this is the second time through this loop, we had
-      // problems.  Tear down the connection so that it gets
-      // rebuilt.
-      if (retry || conn != null) { // not first time in loop
-        if (!membershipManager.memberExists(memberAddress) || membershipManager.isShunned(memberAddress) || membershipManager.shutdownInProgress()) {
-          throw new IOException(LocalizedStrings.TCPConduit_TCPIP_CONNECTION_LOST_AND_MEMBER_IS_NOT_IN_VIEW.toLocalizedString());
-        }
-        // bug35953: Member is still in view; we MUST NOT give up!
-        
-        // Pause just a tiny bit...
-        try {
-          Thread.sleep(100);
-        }
-        catch (InterruptedException e) {
-          interrupted = true;
-          stopper.checkCancelInProgress(e);
-        }
-        
-        // try again after sleep
-        if (!membershipManager.memberExists(memberAddress) || membershipManager.isShunned(memberAddress)) {
-          // OK, the member left.  Just register an error.
-          throw new IOException(LocalizedStrings.TCPConduit_TCPIP_CONNECTION_LOST_AND_MEMBER_IS_NOT_IN_VIEW.toLocalizedString());
-        }
-        
-        // Print a warning (once)
-        if (memberInTrouble == null) {
-          memberInTrouble = memberAddress;
-          logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_ATTEMPTING_TCPIP_RECONNECT_TO__0, memberInTrouble));
-        }
-        else {
-          if (logger.isDebugEnabled()) {
-            logger.debug("Attempting TCP/IP reconnect to {}", memberInTrouble);
+        // If this is the second time through this loop, we had
+        // problems.  Tear down the connection so that it gets
+        // rebuilt.
+        if (retry || conn != null) { // not first time in loop
+          if (!membershipManager.memberExists(memberAddress) || membershipManager.isShunned(memberAddress) || membershipManager.shutdownInProgress()) {
+            throw new IOException(LocalizedStrings.TCPConduit_TCPIP_CONNECTION_LOST_AND_MEMBER_IS_NOT_IN_VIEW.toLocalizedString());
           }
-        }
-        
-        // Close the connection (it will get rebuilt later).
-        this.stats.incReconnectAttempts();
-        if (conn != null) {
-          try { 
-            if (logger.isDebugEnabled()) {
-              logger.debug("Closing old connection.  conn={} before retrying. memberInTrouble={}",
-                  conn, memberInTrouble);
-            }
-            conn.closeForReconnect("closing before retrying"); 
-          } 
-          catch (CancelException ex) {
-            throw ex;
+          // bug35953: Member is still in view; we MUST NOT give up!
+
+          // Pause just a tiny bit...
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException e) {
+            interrupted = true;
+            stopper.checkCancelInProgress(e);
           }
-          catch (Exception ex) {
+
+          // try again after sleep
+          if (!membershipManager.memberExists(memberAddress) || membershipManager.isShunned(memberAddress)) {
+            // OK, the member left.  Just register an error.
+            throw new IOException(LocalizedStrings.TCPConduit_TCPIP_CONNECTION_LOST_AND_MEMBER_IS_NOT_IN_VIEW.toLocalizedString());
           }
-        }
-      } // not first time in loop
-      
-      Exception problem = null;
-      try {
-        // Get (or regenerate) the connection
-        // bug36202: this could generate a ConnectionException, so it
-        // must be caught and retried
-        boolean retryForOldConnection;
-        boolean debugRetry = false;
-        do {
-          retryForOldConnection = false;
-          conn = getConTable().get(memberAddress, preserveOrder, startTime, ackTimeout, ackSATimeout);
-          if (conn == null) {
-            // conduit may be closed - otherwise an ioexception would be thrown
-            problem = new IOException(LocalizedStrings.TCPConduit_UNABLE_TO_RECONNECT_TO_SERVER_POSSIBLE_SHUTDOWN_0.toLocalizedString(memberAddress));
-          } else if (conn.isClosing() || !conn.getRemoteAddress().equals(memberAddress)) {
+
+          // Print a warning (once)
+          if (memberInTrouble == null) {
+            memberInTrouble = memberAddress;
+            logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_ATTEMPTING_TCPIP_RECONNECT_TO__0, memberInTrouble));
+          } else {
             if (logger.isDebugEnabled()) {
-              logger.debug("Got an old connection for {}: {}@{}", memberAddress, conn, conn.hashCode());
+              logger.debug("Attempting TCP/IP reconnect to {}", memberInTrouble);
             }
-            conn.closeOldConnection("closing old connection"); 
-            conn = null;
-            retryForOldConnection = true;
-            debugRetry = true;
           }
-        } while (retryForOldConnection);
-        if (debugRetry && logger.isDebugEnabled()) {
-          logger.debug("Done removing old connections");
-        }
 
-        // we have a connection; fall through and return it
-      }
-      catch (ConnectionException e) {
-        // Race condition between acquiring the connection and attempting
-        // to use it: another thread closed it.
-        problem = e;
-        // [sumedh] No need to retry since Connection.createSender has already
-        // done retries and now member is really unreachable for some reason
-        // even though it may be in the view
-        breakLoop = true;
-      }
-      catch (IOException e) {
-        problem = e;
-        // bug #43962 don't keep trying to connect to an alert listener
-        if (AlertAppender.isThreadAlerting()) {
-          if (logger.isDebugEnabled()) {
-            logger.debug("Giving up connecting to alert listener {}", memberAddress);
+          // Close the connection (it will get rebuilt later).
+          this.stats.incReconnectAttempts();
+          if (conn != null) {
+            try {
+              if (logger.isDebugEnabled()) {
+                logger.debug("Closing old connection.  conn={} before retrying. memberInTrouble={}", conn, memberInTrouble);
+              }
+              conn.closeForReconnect("closing before retrying");
+            } catch (CancelException ex) {
+              throw ex;
+            } catch (Exception ex) {
+            }
           }
-          breakLoop = true;
-        }
-      }
+        } // not first time in loop
 
-      if (problem != null) {
-        // Some problems are not recoverable; check and error out early.
-        if (!membershipManager.memberExists(memberAddress) || membershipManager.isShunned(memberAddress)) { // left the view
-          // Bracket our original warning
-          if (memberInTrouble != null) {
-            // make this msg info to bracket warning
-            logger.info(LocalizedMessage.create(
-                LocalizedStrings.TCPConduit_ENDING_RECONNECT_ATTEMPT_BECAUSE_0_HAS_DISAPPEARED, memberInTrouble));
-          }
-          throw new IOException(LocalizedStrings.TCPConduit_PEER_HAS_DISAPPEARED_FROM_VIEW.toLocalizedString(memberAddress));
-        } // left the view
-
-        if (membershipManager.shutdownInProgress()) { // shutdown in progress
-          // Bracket our original warning
-          if (memberInTrouble != null) {
-            // make this msg info to bracket warning
-            logger.info(LocalizedMessage.create(
-                LocalizedStrings.TCPConduit_ENDING_RECONNECT_ATTEMPT_TO_0_BECAUSE_SHUTDOWN_HAS_STARTED, memberInTrouble));
+        Exception problem = null;
+        try {
+          // Get (or regenerate) the connection
+          // bug36202: this could generate a ConnectionException, so it
+          // must be caught and retried
+          boolean retryForOldConnection;
+          boolean debugRetry = false;
+          do {
+            retryForOldConnection = false;
+            conn = getConTable().get(memberAddress, preserveOrder, startTime, ackTimeout, ackSATimeout);
+            if (conn == null) {
+              // conduit may be closed - otherwise an ioexception would be thrown
+              problem = new IOException(LocalizedStrings.TCPConduit_UNABLE_TO_RECONNECT_TO_SERVER_POSSIBLE_SHUTDOWN_0.toLocalizedString(memberAddress));
+            } else if (conn.isClosing() || !conn.getRemoteAddress().equals(memberAddress)) {
+              if (logger.isDebugEnabled()) {
+                logger.debug("Got an old connection for {}: {}@{}", memberAddress, conn, conn.hashCode());
+              }
+              conn.closeOldConnection("closing old connection");
+              conn = null;
+              retryForOldConnection = true;
+              debugRetry = true;
+            }
+          } while (retryForOldConnection);
+          if (debugRetry && logger.isDebugEnabled()) {
+            logger.debug("Done removing old connections");
           }
-          stopper.checkCancelInProgress(null);
-          throw new DistributedSystemDisconnectedException(LocalizedStrings.TCPConduit_ABANDONED_BECAUSE_SHUTDOWN_IS_IN_PROGRESS.toLocalizedString());
-        } // shutdown in progress
-        
-        // Log the warning.  We wait until now, because we want
-        // to have m defined for a nice message...
-        if (memberInTrouble == null) {
-          logger.warn(LocalizedMessage.create(
-          LocalizedStrings.TCPConduit_ERROR_SENDING_MESSAGE_TO_0_WILL_REATTEMPT_1,
-          new Object[] {memberAddress, problem}));
-          memberInTrouble = memberAddress;
-        }
-        else {
-          if (logger.isDebugEnabled()) {
-            logger.debug("Error sending message to {}", memberAddress, problem);
+
+          // we have a connection; fall through and return it
+        } catch (ConnectionException e) {
+          // Race condition between acquiring the connection and attempting
+          // to use it: another thread closed it.
+          problem = e;
+          // [sumedh] No need to retry since Connection.createSender has already
+          // done retries and now member is really unreachable for some reason
+          // even though it may be in the view
+          breakLoop = true;
+        } catch (IOException e) {
+          problem = e;
+          // bug #43962 don't keep trying to connect to an alert listener
+          if (AlertAppender.isThreadAlerting()) {
+            if (logger.isDebugEnabled()) {
+              logger.debug("Giving up connecting to alert listener {}", memberAddress);
+            }
+            breakLoop = true;
           }
         }
 
-        if (breakLoop) {
-          if (!problem.getMessage().startsWith("Cannot form connection to alert listener")) {
-              logger.warn(LocalizedMessage.create(
-                  LocalizedStrings.TCPConduit_THROWING_IOEXCEPTION_AFTER_FINDING_BREAKLOOP_TRUE),
-                  problem);
+        if (problem != null) {
+          // Some problems are not recoverable; check and error out early.
+          if (!membershipManager.memberExists(memberAddress) || membershipManager.isShunned(memberAddress)) { // left the view
+            // Bracket our original warning
+            if (memberInTrouble != null) {
+              // make this msg info to bracket warning
+              logger.info(LocalizedMessage.create(LocalizedStrings.TCPConduit_ENDING_RECONNECT_ATTEMPT_BECAUSE_0_HAS_DISAPPEARED, memberInTrouble));
+            }
+            throw new IOException(LocalizedStrings.TCPConduit_PEER_HAS_DISAPPEARED_FROM_VIEW.toLocalizedString(memberAddress));
+          } // left the view
+
+          if (membershipManager.shutdownInProgress()) { // shutdown in progress
+            // Bracket our original warning
+            if (memberInTrouble != null) {
+              // make this msg info to bracket warning
+              logger.info(LocalizedMessage.create(LocalizedStrings.TCPConduit_ENDING_RECONNECT_ATTEMPT_TO_0_BECAUSE_SHUTDOWN_HAS_STARTED, memberInTrouble));
+            }
+            stopper.checkCancelInProgress(null);
+            throw new DistributedSystemDisconnectedException(LocalizedStrings.TCPConduit_ABANDONED_BECAUSE_SHUTDOWN_IS_IN_PROGRESS.toLocalizedString());
+          } // shutdown in progress
+
+          // Log the warning.  We wait until now, because we want
+          // to have m defined for a nice message...
+          if (memberInTrouble == null) {
+            logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_ERROR_SENDING_MESSAGE_TO_0_WILL_REATTEMPT_1, new Object[] {
+              memberAddress,
+              problem
+            }));
+            memberInTrouble = memberAddress;
+          } else {
+            if (logger.isDebugEnabled()) {
+              logger.debug("Error sending message to {}", memberAddress, problem);
+            }
           }
-          if (problem instanceof IOException) {
-            throw (IOException)problem;
+
+          if (breakLoop) {
+            if (!problem.getMessage().startsWith("Cannot form connection to alert listener")) {
+              logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_THROWING_IOEXCEPTION_AFTER_FINDING_BREAKLOOP_TRUE), problem);
+            }
+            if (problem instanceof IOException) {
+              throw (IOException) problem;
+            } else {
+              IOException ioe = new IOException(LocalizedStrings.TCPConduit_PROBLEM_CONNECTING_TO_0.toLocalizedString(memberAddress));
+              ioe.initCause(problem);
+              throw ioe;
+            }
           }
-          else {
-            IOException ioe = new IOException( LocalizedStrings.TCPConduit_PROBLEM_CONNECTING_TO_0.toLocalizedString(memberAddress));
-            ioe.initCause(problem);
-            throw ioe;
+          // Retry the operation (indefinitely)
+          continue;
+        } // problem != null
+        // Success!
+
+        // Make sure our logging is bracketed if there was a problem
+        if (memberInTrouble != null) {
+          logger.info(LocalizedMessage.create(LocalizedStrings.TCPConduit_SUCCESSFULLY_RECONNECTED_TO_MEMBER_0, memberInTrouble));
+          if (logger.isTraceEnabled()) {
+            logger.trace("new connection is {} memberAddress={}", conn, memberAddress);
           }
         }
-        // Retry the operation (indefinitely)
-        continue;
-      } // problem != null
-      // Success!
-
-      // Make sure our logging is bracketed if there was a problem
-      if (memberInTrouble != null) {
-        logger.info(LocalizedMessage.create(
-            LocalizedStrings.TCPConduit_SUCCESSFULLY_RECONNECTED_TO_MEMBER_0,
-            memberInTrouble));
-        if (logger.isTraceEnabled()) {
-          logger.trace("new connection is {} memberAddress={}", conn, memberAddress);
-        }
-      }
-      return conn;
-      }
-      finally {
+        return conn;
+      } finally {
         if (interrupted) {
           Thread.currentThread().interrupt();
         }
@@ -1077,19 +1105,21 @@ public class TCPConduit implements Runnable {
       }
     }
   }
+
   /**
    * Returns the distribution manager of the direct channel
    */
   public DM getDM() {
     return directChannel.getDM();
   }
+
   /**
    * Closes any connections used to communicate with the given member
    */
   public void removeEndpoint(DistributedMember mbr, String reason) {
     removeEndpoint(mbr, reason, true);
   }
-  
+
   public void removeEndpoint(DistributedMember mbr, String reason, boolean notifyDisconnect) {
     ConnectionTable ct = this.conTable;
     if (ct == null) {
@@ -1097,13 +1127,15 @@ public class TCPConduit implements Runnable {
     }
     ct.removeEndpoint(mbr, reason, notifyDisconnect);
   }
-  
-  /** check to see if there are still any receiver threads for the given end-point */
+
+  /**
+   * check to see if there are still any receiver threads for the given end-point
+   */
   public boolean hasReceiversFor(DistributedMember endPoint) {
     ConnectionTable ct = this.conTable;
     return (ct != null) && ct.hasReceiversFor(endPoint);
   }
-  
+
   protected class Stopper extends CancelCriterion {
 
     /* (non-Javadoc)
@@ -1120,7 +1152,7 @@ public class TCPConduit implements Runnable {
       }
       return null;
     }
-    
+
     /* (non-Javadoc)
      * @see com.gemstone.gemfire.CancelCriterion#generateCancelledException(java.lang.Throwable)
      */
@@ -1144,14 +1176,14 @@ public class TCPConduit implements Runnable {
       return result;
     }
   }
-  
+
   private final Stopper stopper = new Stopper();
-  
+
   public CancelCriterion getCancelCriterion() {
     return stopper;
   }
-  
-  
+
+
   /**
    * if the conduit is disconnected due to an abnormal condition, this
    * will describe the reason
@@ -1160,7 +1192,7 @@ public class TCPConduit implements Runnable {
   public Exception getShutdownCause() {
     return this.shutdownCause;
   }
-  
+
   /**
    * ARB: Called by Connection before handshake reply is sent.
    * Returns true if member is part of view, false if membership is not confirmed before timeout.
@@ -1168,20 +1200,20 @@ public class TCPConduit implements Runnable {
   public boolean waitForMembershipCheck(InternalDistributedMember remoteId) {
     return membershipManager.waitForMembershipCheck(remoteId);
   }
-  
+
   /**
    * simulate being sick
    */
   public void beSick() {
-//    this.inhibitNewConnections = true;
-//    this.conTable.closeReceivers(true);
+    //    this.inhibitNewConnections = true;
+    //    this.conTable.closeReceivers(true);
   }
-  
+
   /**
    * simulate being healthy
    */
   public void beHealthy() {
-//    this.inhibitNewConnections = false;
+    //    this.inhibitNewConnections = false;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/management/internal/JmxManagerAdvisee.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/JmxManagerAdvisee.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/JmxManagerAdvisee.java
index 3af34e1..19679ad 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/JmxManagerAdvisee.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/JmxManagerAdvisee.java
@@ -19,7 +19,7 @@ package com.gemstone.gemfire.management.internal;
 import com.gemstone.gemfire.CancelCriterion;
 import com.gemstone.gemfire.distributed.internal.*;
 import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
-import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.net.SocketCreator;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.management.ManagementService;
 import com.gemstone.gemfire.management.internal.JmxManagerAdvisor.JmxManagerProfile;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/management/internal/ManagementAgent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/ManagementAgent.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/ManagementAgent.java
index 9807456..bb9bfb4 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/ManagementAgent.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/ManagementAgent.java
@@ -16,15 +16,49 @@
  */
 package com.gemstone.gemfire.management.internal;
 
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.rmi.AlreadyBoundException;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.rmi.server.RMIClientSocketFactory;
+import java.rmi.server.RMIServerSocketFactory;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.HashMap;
+import java.util.Set;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXServiceURL;
+import javax.management.remote.rmi.RMIConnectorServer;
+import javax.management.remote.rmi.RMIJRMPServerImpl;
+import javax.management.remote.rmi.RMIServerImpl;
+import javax.rmi.ssl.SslRMIClientSocketFactory;
+
+import org.apache.logging.log4j.Logger;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+
 import com.gemstone.gemfire.GemFireConfigException;
 import com.gemstone.gemfire.cache.CacheFactory;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
 import com.gemstone.gemfire.internal.GemFireVersion;
-import com.gemstone.gemfire.internal.SocketCreator;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.internal.lang.StringUtils;
 import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.net.SocketCreator;
+import com.gemstone.gemfire.internal.net.SocketCreatorFactory;
 import com.gemstone.gemfire.internal.security.shiro.JMXShiroAuthenticator;
 import com.gemstone.gemfire.internal.tcp.TCPConduit;
 import com.gemstone.gemfire.management.ManagementException;
@@ -34,44 +68,18 @@ import com.gemstone.gemfire.management.internal.security.AccessControlMBean;
 import com.gemstone.gemfire.management.internal.security.MBeanServerWrapper;
 import com.gemstone.gemfire.management.internal.security.ResourceConstants;
 import com.gemstone.gemfire.management.internal.unsafe.ReadOpFileAccessController;
-import org.apache.logging.log4j.Logger;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.ServerConnector;
-
-import javax.management.*;
-import javax.management.remote.JMXConnectorServer;
-import javax.management.remote.JMXServiceURL;
-import javax.management.remote.rmi.RMIConnectorServer;
-import javax.management.remote.rmi.RMIJRMPServerImpl;
-import javax.management.remote.rmi.RMIServerImpl;
-import javax.rmi.ssl.SslRMIClientSocketFactory;
-import java.io.IOException;
-import java.io.Serializable;
-import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.UnknownHostException;
-import java.rmi.AlreadyBoundException;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
-import java.rmi.server.RMIClientSocketFactory;
-import java.rmi.server.RMIServerSocketFactory;
-import java.rmi.server.UnicastRemoteObject;
-import java.util.HashMap;
-import java.util.Set;
 
 /**
  * Agent implementation that controls the JMX server end points for JMX clients
  * to connect, such as an RMI server.
- * 
+ * <p>
  * The ManagementAgent could be used in a loner or GemFire client to define and
  * control JMX server end points for the Platform MBeanServer and the GemFire
  * MBeans hosted within it.
- *
  * @since GemFire 7.0
  */
 public class ManagementAgent {
+
   private static final Logger logger = LogService.getLogger();
 
   /**
@@ -115,9 +123,10 @@ public class ManagementAgent {
   }
 
   private boolean isServerNode(GemFireCacheImpl cache) {
-    return (cache.getDistributedSystem().getDistributedMember().getVmKind() != DistributionManager.LOCATOR_DM_TYPE
-        && cache.getDistributedSystem().getDistributedMember().getVmKind() != DistributionManager.ADMIN_ONLY_DM_TYPE && !cache
-          .isClient());
+    return (cache.getDistributedSystem().getDistributedMember().getVmKind() != DistributionManager.LOCATOR_DM_TYPE && cache.getDistributedSystem()
+                                                                                                                           .getDistributedMember()
+                                                                                                                           .getVmKind() != DistributionManager.ADMIN_ONLY_DM_TYPE && !cache
+      .isClient());
   }
 
   public synchronized void startAgent(GemFireCacheImpl cache) {
@@ -128,8 +137,7 @@ public class ManagementAgent {
       startHttpService(isServerNode(cache));
     } else {
       if (logger.isDebugEnabled()) {
-        logger
-            .debug("Developer REST APIs webapp is already running, Not Starting M&M REST and pulse!");
+        logger.debug("Developer REST APIs webapp is already running, Not Starting M&M REST and pulse!");
       }
     }
 
@@ -146,8 +154,9 @@ public class ManagementAgent {
   public synchronized void stopAgent() {
     stopHttpService();
 
-    if (!this.running)
+    if (!this.running) {
       return;
+    }
 
     if (logger.isDebugEnabled()) {
       logger.debug("Stopping jmx manager agent");
@@ -167,15 +176,13 @@ public class ManagementAgent {
   private AgentUtil agentUtil = new AgentUtil(GEMFIRE_VERSION);
 
   private void startHttpService(boolean isServer) {
-    final SystemManagementService managementService = (SystemManagementService) ManagementService
-        .getManagementService(CacheFactory.getAnyInstance());
+    final SystemManagementService managementService = (SystemManagementService) ManagementService.getManagementService(CacheFactory.getAnyInstance());
 
     final ManagerMXBean managerBean = managementService.getManagerMXBean();
 
     if (this.config.getHttpServicePort() != 0) {
       if (logger.isDebugEnabled()) {
-        logger.debug("Attempting to start HTTP service on port ({}) at bind-address ({})...",
-            this.config.getHttpServicePort(), this.config.getHttpServiceBindAddress());
+        logger.debug("Attempting to start HTTP service on port ({}) at bind-address ({})...", this.config.getHttpServicePort(), this.config.getHttpServiceBindAddress());
       }
 
       // Find the Management WAR file
@@ -195,8 +202,7 @@ public class ManagementAgent {
         if (logger.isDebugEnabled()) {
           logger.debug(message);
         }
-      }
-      else if (isCustomAuthenticator()){
+      } else if (isCustomAuthenticator()) {
         System.setProperty("spring.profiles.active", "pulse.authentication.gemfire");
       }
 
@@ -218,15 +224,11 @@ public class ManagementAgent {
 
           boolean isRestWebAppAdded = false;
 
-          this.httpServer = JettyHelper.initJetty(bindAddress, port,
-              this.config.getHttpServiceSSLEnabled(),
-              this.config.getHttpServiceSSLRequireAuthentication(),
-              this.config.getHttpServiceSSLProtocols(), this.config.getHttpServiceSSLCiphers(),
-              this.config.getHttpServiceSSLProperties());
+          this.httpServer = JettyHelper.initJetty(bindAddress, port, this.config.getHttpServiceSSLEnabled(), this.config.getHttpServiceSSLRequireAuthentication(), this.config
+            .getHttpServiceSSLProtocols(), this.config.getHttpServiceSSLCiphers(), this.config.getHttpServiceSSLProperties());
 
           if (agentUtil.isWebApplicationAvailable(gemfireWar)) {
-            this.httpServer = JettyHelper
-                .addWebApplication(this.httpServer, "/gemfire", gemfireWar);
+            this.httpServer = JettyHelper.addWebApplication(this.httpServer, "/gemfire", gemfireWar);
           }
 
           if (agentUtil.isWebApplicationAvailable(pulseWar)) {
@@ -235,8 +237,7 @@ public class ManagementAgent {
 
           if (isServer && this.config.getStartDevRestApi()) {
             if (agentUtil.isWebApplicationAvailable(gemfireAPIWar)) {
-              this.httpServer = JettyHelper.addWebApplication(this.httpServer, "/gemfire-api",
-                  gemfireAPIWar);
+              this.httpServer = JettyHelper.addWebApplication(this.httpServer, "/gemfire-api", gemfireAPIWar);
               isRestWebAppAdded = true;
             }
           } else {
@@ -248,8 +249,7 @@ public class ManagementAgent {
           }
 
           if (logger.isDebugEnabled()) {
-            logger.debug("Starting HTTP embedded server on port ({}) at bind-address ({})...",
-                ((ServerConnector) this.httpServer.getConnectors()[0]).getPort(), bindAddress);
+            logger.debug("Starting HTTP embedded server on port ({}) at bind-address ({})...", ((ServerConnector) this.httpServer.getConnectors()[0]).getPort(), bindAddress);
           }
 
           System.setProperty(PULSE_EMBEDDED_PROP, "true");
@@ -259,8 +259,7 @@ public class ManagementAgent {
           // now, that Tomcat has been started, we can set the URL used by web
           // clients to connect to Pulse
           if (agentUtil.isWebApplicationAvailable(pulseWar)) {
-            managerBean.setPulseURL("http://".concat(getHost(bindAddress)).concat(":")
-                .concat(String.valueOf(port)).concat("/pulse/"));
+            managerBean.setPulseURL("http://".concat(getHost(bindAddress)).concat(":").concat(String.valueOf(port)).concat("/pulse/"));
           }
 
           // set cache property for developer REST service running
@@ -278,15 +277,13 @@ public class ManagementAgent {
         }
       } catch (Exception e) {
         stopHttpService();// Jetty needs to be stopped even if it has failed to
-                          // start. Some of the threads are left behind even if
-                          // server.start() fails due to an exception
-        setStatusMessage(managerBean, "HTTP service failed to start with "
-            + e.getClass().getSimpleName() + " '" + e.getMessage() + "'");
+        // start. Some of the threads are left behind even if
+        // server.start() fails due to an exception
+        setStatusMessage(managerBean, "HTTP service failed to start with " + e.getClass().getSimpleName() + " '" + e.getMessage() + "'");
         throw new ManagementException("HTTP service failed to start", e);
       }
     } else {
-      setStatusMessage(managerBean,
-          "Embedded HTTP server configured not to start (http-service-port=0) or (jmx-manager-http-port=0)");
+      setStatusMessage(managerBean, "Embedded HTTP server configured not to start (http-service-port=0) or (jmx-manager-http-port=0)");
     }
   }
 
@@ -318,8 +315,7 @@ public class ManagementAgent {
         try {
           this.httpServer.destroy();
         } catch (Exception ignore) {
-          logger.error("Failed to properly release resources held by the HTTP service: {}",
-              ignore.getMessage(), ignore);
+          logger.error("Failed to properly release resources held by the HTTP service: {}", ignore.getMessage(), ignore);
         } finally {
           this.httpServer = null;
           System.clearProperty("catalina.base");
@@ -357,14 +353,10 @@ public class ManagementAgent {
     final boolean ssl = this.config.getJmxManagerSSLEnabled();
 
     if (logger.isDebugEnabled()) {
-      logger.debug("Starting jmx manager agent on port {}{}", port,
-          (bindAddr != null ? (" bound to " + bindAddr) : "") + (ssl ? " using SSL" : ""));
+      logger.debug("Starting jmx manager agent on port {}{}", port, (bindAddr != null ? (" bound to " + bindAddr) : "") + (ssl ? " using SSL" : ""));
     }
 
-    final SocketCreator sc = SocketCreator.createNonDefaultInstance(ssl,
-        this.config.getJmxManagerSSLRequireAuthentication(),
-        this.config.getJmxManagerSSLProtocols(), this.config.getJmxManagerSSLCiphers(),
-        this.config.getJmxSSLProperties());
+    final SocketCreator sc = SocketCreatorFactory.getJMXManagerSSLSocketCreator();
     RMIClientSocketFactory csf = ssl ? new SslRMIClientSocketFactory() : null;// RMISocketFactory.getDefaultSocketFactory();
     // new GemFireRMIClientSocketFactory(sc, getLogger());
     RMIServerSocketFactory ssf = new GemFireRMIServerSocketFactory(sc, bindAddr);
@@ -413,8 +405,7 @@ public class ManagementAgent {
     //
     // We construct a JMXServiceURL corresponding to what we have done
     // for our stub...
-    final JMXServiceURL url = new JMXServiceURL("service:jmx:rmi://" + hostname + ":" + port
-        + "/jndi/rmi://" + hostname + ":" + port + "/jmxrmi");
+    final JMXServiceURL url = new JMXServiceURL("service:jmx:rmi://" + hostname + ":" + port + "/jndi/rmi://" + hostname + ":" + port + "/jmxrmi");
 
     // Create an RMI connector server with the JMXServiceURL
     //
@@ -441,7 +432,7 @@ public class ManagementAgent {
     };
 
     String shiroConfig = this.config.getShiroInit();
-    if (! StringUtils.isBlank(shiroConfig) || isCustomAuthenticator()) {
+    if (!StringUtils.isBlank(shiroConfig) || isCustomAuthenticator()) {
       shiroAuthenticator = new JMXShiroAuthenticator();
       env.put(JMXConnectorServer.AUTHENTICATOR, shiroAuthenticator);
       cs.addNotificationListener(shiroAuthenticator, null, cs.getAttributes());
@@ -450,9 +441,7 @@ public class ManagementAgent {
       MBeanServerWrapper mBeanServerWrapper = new MBeanServerWrapper();
       cs.setMBeanServerForwarder(mBeanServerWrapper);
       registerAccessControlMBean();
-    }
-
-    else {
+    } else {
       /* Disable the old authenticator mechanism */
       String pwFile = this.config.getJmxManagerPasswordFile();
       if (pwFile != null && pwFile.length() > 0) {
@@ -511,11 +500,11 @@ public class ManagementAgent {
     return factoryName != null && !factoryName.isEmpty();
   }
 
-  private static class GemFireRMIClientSocketFactory implements RMIClientSocketFactory,
-      Serializable {
+  private static class GemFireRMIClientSocketFactory implements RMIClientSocketFactory, Serializable {
+
     private static final long serialVersionUID = -7604285019188827617L;
 
-    private/* final hack to prevent serialization */transient SocketCreator sc;
+    private/* final hack to prevent serialization */ transient SocketCreator sc;
 
     public GemFireRMIClientSocketFactory(SocketCreator sc) {
       this.sc = sc;
@@ -525,12 +514,14 @@ public class ManagementAgent {
     public Socket createSocket(String host, int port) throws IOException {
       return this.sc.connectForClient(host, port, 0/* no timeout */);
     }
-  };
+  }
+
+  ;
+
+  private static class GemFireRMIServerSocketFactory implements RMIServerSocketFactory, Serializable {
 
-  private static class GemFireRMIServerSocketFactory implements RMIServerSocketFactory,
-      Serializable {
     private static final long serialVersionUID = -811909050641332716L;
-    private/* final hack to prevent serialization */transient SocketCreator sc;
+    private/* final hack to prevent serialization */ transient SocketCreator sc;
     private final InetAddress bindAddr;
 
     public GemFireRMIServerSocketFactory(SocketCreator sc, InetAddress bindAddr) {
@@ -542,5 +533,7 @@ public class ManagementAgent {
     public ServerSocket createServerSocket(int port) throws IOException {
       return this.sc.createServerSocket(port, TCPConduit.getBackLog(), this.bindAddr);
     }
-  };
+  }
+
+  ;
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/management/internal/RestAgent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/RestAgent.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/RestAgent.java
index 8b41548..53f0894 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/RestAgent.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/RestAgent.java
@@ -20,7 +20,7 @@ package com.gemstone.gemfire.management.internal;
 import com.gemstone.gemfire.cache.*;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 import com.gemstone.gemfire.internal.GemFireVersion;
-import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.net.SocketCreator;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.internal.cache.InternalRegionArguments;
 import com.gemstone.gemfire.internal.logging.LogService;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/MemberMBeanBridge.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/MemberMBeanBridge.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/MemberMBeanBridge.java
index 89cc4f0..efc48fa 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/MemberMBeanBridge.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/MemberMBeanBridge.java
@@ -43,6 +43,7 @@ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
 import com.gemstone.gemfire.internal.logging.log4j.LogWriterAppender;
 import com.gemstone.gemfire.internal.logging.log4j.LogWriterAppenders;
+import com.gemstone.gemfire.internal.net.SocketCreator;
 import com.gemstone.gemfire.internal.offheap.MemoryAllocator;
 import com.gemstone.gemfire.internal.offheap.OffHeapMemoryStats;
 import com.gemstone.gemfire.internal.process.PidUnavailableException;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/LauncherLifecycleCommands.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/LauncherLifecycleCommands.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/LauncherLifecycleCommands.java
index c67a4bc..bcdbdc5 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/LauncherLifecycleCommands.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/LauncherLifecycleCommands.java
@@ -32,7 +32,7 @@ import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient;
 import com.gemstone.gemfire.internal.DistributionLocator;
 import com.gemstone.gemfire.internal.GemFireVersion;
 import com.gemstone.gemfire.internal.OSProcess;
-import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.net.SocketCreator;
 import com.gemstone.gemfire.internal.cache.persistence.PersistentMemberPattern;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.lang.ClassUtils;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/memcached/GemFireMemcachedServer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/memcached/GemFireMemcachedServer.java b/geode-core/src/main/java/com/gemstone/gemfire/memcached/GemFireMemcachedServer.java
index ba0b479..6e81309 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/memcached/GemFireMemcachedServer.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/memcached/GemFireMemcachedServer.java
@@ -36,7 +36,7 @@ import com.gemstone.gemfire.cache.CacheFactory;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.RegionShortcut;
 import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.net.SocketCreator;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.internal.memcached.ConnectionHandler;
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/redis/GemFireRedisServer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/redis/GemFireRedisServer.java b/geode-core/src/main/java/com/gemstone/gemfire/redis/GemFireRedisServer.java
index 81f87d5..00a38db 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/redis/GemFireRedisServer.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/redis/GemFireRedisServer.java
@@ -22,7 +22,7 @@ import com.gemstone.gemfire.LogWriter;
 import com.gemstone.gemfire.cache.*;
 import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
 import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.net.SocketCreator;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.internal.hll.HyperLogLogPlus;
 import com.gemstone.gemfire.internal.redis.*;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
index 3ae6cf6..f959f67 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
@@ -61,6 +61,7 @@ import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifierStats
 import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
 import com.gemstone.gemfire.internal.logging.InternalLogWriter;
 import com.gemstone.gemfire.internal.logging.LocalLogWriter;
+import com.gemstone.gemfire.internal.net.SocketCreatorFactory;
 import com.gemstone.gemfire.test.dunit.AsyncInvocation;
 import com.gemstone.gemfire.test.dunit.Host;
 import com.gemstone.gemfire.test.dunit.Invoke;
@@ -132,6 +133,7 @@ public class ConnectionPoolDUnitTest extends JUnit4CacheTestCase {
       }
     });
     postTearDownConnectionPoolDUnitTest();
+
   }
   
   protected void postTearDownConnectionPoolDUnitTest() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/CacheServerSSLConnectionDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/CacheServerSSLConnectionDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/CacheServerSSLConnectionDUnitTest.java
index ee5cc62..c9c04fd 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/CacheServerSSLConnectionDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/CacheServerSSLConnectionDUnitTest.java
@@ -37,6 +37,7 @@ import com.gemstone.gemfire.cache.client.ClientCacheFactory;
 import com.gemstone.gemfire.cache.client.ClientRegionFactory;
 import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
 import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.internal.net.SocketCreatorFactory;
 import com.gemstone.gemfire.security.AuthenticationRequiredException;
 import com.gemstone.gemfire.test.dunit.Host;
 import com.gemstone.gemfire.test.dunit.IgnoredException;
@@ -223,12 +224,14 @@ public class CacheServerSSLConnectionDUnitTest extends JUnit4DistributedTestCase
   public static void closeCacheTask(){
     if (instance != null && instance.cache != null) {
       instance.cache.close();
+      SocketCreatorFactory.close();
     }
   }
 
   public static void closeClientCacheTask(){
     if (instance != null && instance.clientCache != null) {
       instance.clientCache.close();
+      SocketCreatorFactory.close();
     }
   }
 
@@ -370,6 +373,7 @@ public class CacheServerSSLConnectionDUnitTest extends JUnit4DistributedTestCase
     VM clientVM = host.getVM(2);
     clientVM.invoke(() -> closeClientCacheTask());
     serverVM.invoke(() -> closeCacheTask());
+
   }
 }
 


Mime
View raw message