geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bschucha...@apache.org
Subject [geode] 01/01: GEODE-2113 Implement SSL over NIO
Date Wed, 05 Dec 2018 22:21:16 GMT
This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-2113
in repository https://gitbox.apache.org/repos/asf/geode.git

commit dd5f6f4a4cbb6bcc35526d8dd793176099cdc3a2
Author: Bruce Schuchardt <bschuchardt@pivotal.io>
AuthorDate: Wed Dec 5 14:09:51 2018 -0800

    GEODE-2113 Implement SSL over NIO
    
    This removes old-I/O use in TCPConduit peer-to-peer communications.
    This was used for SSL/TLS secure commuications but Java has had an
    SSLEngine implementation that allows you to implement secure
    communications on new-I/O SocketChannels or any other transport
    mechanism.
    
    A new NioSSLEngine class wraps the JDK's SSLEngine and provides the SSL
    handshake as well as encryption/decryption of messages.  SocketCreator
    performs the SSL handshake and returns a NioSslEngine that TCPConduit
    then uses for messaging.
    
    I've also done a lot of cleanup of compilation warnings in
    Connection.java and removed references to "NIO".  The primary SSL/TLS
    changes in that class are in writeFully (renamed from nioWriteFully)
    and processBuffer (renamed from processNIOBuffer).
    
    Porting client/server to use NioSSLEngine will be done under a separate
    ticket and a different version of NioEngine may be created to secure
    UDP messaging.
---
 .../CacheServerSSLConnectionDUnitTest.java         |   93 +-
 ...ToDataThrowsRuntimeExceptionRegressionTest.java |    3 -
 .../internal/net/SSLSocketIntegrationTest.java     |  133 +-
 .../distributed/internal/DistributionStats.java    |    2 +-
 .../distributed/internal/direct/DirectChannel.java |    6 +-
 .../geode/internal/{tcp => net}/Buffers.java       |   50 +-
 .../NioEngine.java}                                |   31 +-
 .../org/apache/geode/internal/net/NioFilter.java   |   47 +
 .../apache/geode/internal/net/NioSslEngine.java    |  281 +++
 .../apache/geode/internal/net/SocketCreator.java   |   42 +-
 .../org/apache/geode/internal/tcp/Connection.java  | 1798 +++++++-------------
 .../apache/geode/internal/tcp/ConnectionTable.java |    6 +-
 .../apache/geode/internal/tcp/MsgOutputStream.java |    3 +-
 .../org/apache/geode/internal/tcp/MsgStreamer.java |    1 +
 .../apache/geode/internal/tcp/NIOMsgReader.java    |    1 +
 .../geode/internal/tcp/PeerConnectionFactory.java  |   11 +-
 .../org/apache/geode/internal/tcp/TCPConduit.java  |  187 +-
 .../geode/internal/tcp/ConnectionJUnitTest.java    |   12 +-
 .../geode/internal/tcp/ConnectionTableTest.java    |    8 +-
 .../apache/geode/internal/tcp/ConnectionTest.java  |    4 +-
 .../util/PluckStacksJstackGeneratedDump.txt        |   18 +-
 21 files changed, 1291 insertions(+), 1446 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/CacheServerSSLConnectionDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/CacheServerSSLConnectionDUnitTest.java
index 58ba260..a562975 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/CacheServerSSLConnectionDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/CacheServerSSLConnectionDUnitTest.java
@@ -23,7 +23,6 @@ import static org.apache.geode.distributed.ConfigurationProperties.CLUSTER_SSL_P
 import static org.apache.geode.distributed.ConfigurationProperties.CLUSTER_SSL_REQUIRE_AUTHENTICATION;
 import static org.apache.geode.distributed.ConfigurationProperties.CLUSTER_SSL_TRUSTSTORE;
 import static org.apache.geode.distributed.ConfigurationProperties.CLUSTER_SSL_TRUSTSTORE_PASSWORD;
-import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static org.apache.geode.distributed.ConfigurationProperties.SERVER_SSL_CIPHERS;
 import static org.apache.geode.distributed.ConfigurationProperties.SERVER_SSL_ENABLED;
@@ -49,6 +48,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
@@ -77,6 +77,7 @@ import org.apache.geode.cache.client.ClientRegionFactory;
 import org.apache.geode.cache.client.ClientRegionShortcut;
 import org.apache.geode.cache.client.NoAvailableServersException;
 import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.Locator;
 import org.apache.geode.internal.security.SecurableCommunicationChannel;
 import org.apache.geode.security.AuthenticationRequiredException;
 import org.apache.geode.test.dunit.AsyncInvocation;
@@ -144,7 +145,7 @@ public class CacheServerSSLConnectionDUnitTest extends JUnit4DistributedTestCase
 
   public Cache createCache(Properties props) throws Exception {
     props.setProperty(MCAST_PORT, "0");
-    props.setProperty(LOCATORS, "");
+    // props.setProperty(LOCATORS, "");
     cache = new CacheFactory(props).create();
     if (cache == null) {
       throw new Exception("CacheFactory.create() returned null ");
@@ -170,30 +171,21 @@ public class CacheServerSSLConnectionDUnitTest extends JUnit4DistributedTestCase
   }
 
   @SuppressWarnings("rawtypes")
-  public void setUpServerVM(final boolean cacheServerSslenabled) throws Exception {
-    System.setProperty("javax.net.debug", "ssl,handshake");
+  public void setUpServerVM(final boolean cacheServerSslenabled, int optionalLocatorPort)
+      throws Exception {
+    // System.setProperty("javax.net.debug", "ssl,handshake");
 
     Properties gemFireProps = new Properties();
+    if (optionalLocatorPort > 0) {
+      gemFireProps.put("locators", "localhost[" + optionalLocatorPort + "]");
+    }
 
     String cacheServerSslprotocols = "any";
     String cacheServerSslciphers = "any";
     boolean cacheServerSslRequireAuth = true;
     if (!useOldSSLSettings) {
-      gemFireProps.put(SSL_ENABLED_COMPONENTS,
-          SecurableCommunicationChannel.CLUSTER + "," + SecurableCommunicationChannel.SERVER);
-      gemFireProps.put(SSL_PROTOCOLS, cacheServerSslprotocols);
-      gemFireProps.put(SSL_CIPHERS, cacheServerSslciphers);
-      gemFireProps.put(SSL_REQUIRE_AUTHENTICATION, String.valueOf(cacheServerSslRequireAuth));
-
-      String keyStore =
-          TestUtil.getResourcePath(CacheServerSSLConnectionDUnitTest.class, SERVER_KEY_STORE);
-      String trustStore =
-          TestUtil.getResourcePath(CacheServerSSLConnectionDUnitTest.class, SERVER_TRUST_STORE);
-      gemFireProps.put(SSL_KEYSTORE_TYPE, "jks");
-      gemFireProps.put(SSL_KEYSTORE, keyStore);
-      gemFireProps.put(SSL_KEYSTORE_PASSWORD, "password");
-      gemFireProps.put(SSL_TRUSTSTORE, trustStore);
-      gemFireProps.put(SSL_TRUSTSTORE_PASSWORD, "password");
+      getNewSSLSettings(gemFireProps, cacheServerSslprotocols, cacheServerSslciphers,
+          cacheServerSslRequireAuth);
     } else {
       gemFireProps.put(CLUSTER_SSL_ENABLED, String.valueOf(cacheServerSslenabled));
       gemFireProps.put(CLUSTER_SSL_PROTOCOLS, cacheServerSslprotocols);
@@ -222,6 +214,25 @@ public class CacheServerSSLConnectionDUnitTest extends JUnit4DistributedTestCase
     r.put("serverkey", "servervalue");
   }
 
+  private void getNewSSLSettings(Properties gemFireProps, String cacheServerSslprotocols,
+      String cacheServerSslciphers, boolean cacheServerSslRequireAuth) {
+    gemFireProps.put(SSL_ENABLED_COMPONENTS,
+        SecurableCommunicationChannel.CLUSTER + "," + SecurableCommunicationChannel.SERVER);
+    gemFireProps.put(SSL_PROTOCOLS, cacheServerSslprotocols);
+    gemFireProps.put(SSL_CIPHERS, cacheServerSslciphers);
+    gemFireProps.put(SSL_REQUIRE_AUTHENTICATION, String.valueOf(cacheServerSslRequireAuth));
+
+    String keyStore =
+        TestUtil.getResourcePath(CacheServerSSLConnectionDUnitTest.class, SERVER_KEY_STORE);
+    String trustStore =
+        TestUtil.getResourcePath(CacheServerSSLConnectionDUnitTest.class, SERVER_TRUST_STORE);
+    gemFireProps.put(SSL_KEYSTORE_TYPE, "jks");
+    gemFireProps.put(SSL_KEYSTORE, keyStore);
+    gemFireProps.put(SSL_KEYSTORE_PASSWORD, "password");
+    gemFireProps.put(SSL_TRUSTSTORE, trustStore);
+    gemFireProps.put(SSL_TRUSTSTORE_PASSWORD, "password");
+  }
+
   public void setUpClientVM(String host, int port, boolean cacheServerSslenabled,
       boolean cacheServerSslRequireAuth, String keyStore, String trustStore, boolean subscription,
       boolean clientHasTrustedKeystore) {
@@ -308,8 +319,9 @@ public class CacheServerSSLConnectionDUnitTest extends JUnit4DistributedTestCase
   }
 
 
-  public static void setUpServerVMTask(boolean cacheServerSslenabled) throws Exception {
-    instance.setUpServerVM(cacheServerSslenabled);
+  public static void setUpServerVMTask(boolean cacheServerSslenabled, int optionalLocatorPort)
+      throws Exception {
+    instance.setUpServerVM(cacheServerSslenabled, optionalLocatorPort);
   }
 
   public static int createServerTask() throws Exception {
@@ -371,20 +383,35 @@ public class CacheServerSSLConnectionDUnitTest extends JUnit4DistributedTestCase
     final Host host = Host.getHost(0);
     VM serverVM = host.getVM(1);
     VM clientVM = host.getVM(2);
+    VM serverVM2 = host.getVM(3);
 
     boolean cacheServerSslenabled = true;
     boolean cacheClientSslenabled = true;
     boolean cacheClientSslRequireAuth = true;
 
-    serverVM.invoke(() -> setUpServerVMTask(cacheServerSslenabled));
-    int port = serverVM.invoke(() -> createServerTask());
+    Properties locatorProps = new Properties();
+    String cacheServerSslprotocols = "any";
+    String cacheServerSslciphers = "any";
+    boolean cacheServerSslRequireAuth = true;
+    getNewSSLSettings(locatorProps, cacheServerSslprotocols, cacheServerSslciphers,
+        cacheServerSslRequireAuth);
+    Locator locator = Locator.startLocatorAndDS(0, new File(""), locatorProps);
+    int locatorPort = locator.getPort();
+    try {
+      serverVM.invoke(() -> setUpServerVMTask(cacheServerSslenabled, locatorPort));
+      int port = serverVM.invoke(() -> createServerTask());
+      serverVM2.invoke(() -> setUpServerVMTask(cacheServerSslenabled, locatorPort));
+      serverVM2.invoke(() -> createServerTask());
 
-    String hostName = host.getHostName();
+      String hostName = host.getHostName();
 
-    clientVM.invoke(() -> setUpClientVMTask(hostName, port, cacheClientSslenabled,
-        cacheClientSslRequireAuth, CLIENT_KEY_STORE, CLIENT_TRUST_STORE, true));
-    clientVM.invoke(() -> doClientRegionTestTask());
-    serverVM.invoke(() -> doServerRegionTestTask());
+      clientVM.invoke(() -> setUpClientVMTask(hostName, port, cacheClientSslenabled,
+          cacheClientSslRequireAuth, CLIENT_KEY_STORE, CLIENT_TRUST_STORE, true));
+      clientVM.invoke(() -> doClientRegionTestTask());
+      serverVM.invoke(() -> doServerRegionTestTask());
+    } finally {
+      locator.stop();
+    }
   }
 
   /**
@@ -413,7 +440,7 @@ public class CacheServerSSLConnectionDUnitTest extends JUnit4DistributedTestCase
     boolean cacheClientSslenabled = true;
     boolean cacheClientSslRequireAuth = true;
 
-    serverVM.invoke(() -> setUpServerVMTask(cacheServerSslenabled));
+    serverVM.invoke(() -> setUpServerVMTask(cacheServerSslenabled, 0));
     int port = serverVM.invoke(() -> createServerTask());
 
     String hostName = host.getHostName();
@@ -464,7 +491,7 @@ public class CacheServerSSLConnectionDUnitTest extends JUnit4DistributedTestCase
     boolean cacheClientSslenabled = false;
     boolean cacheClientSslRequireAuth = true;
 
-    serverVM.invoke(() -> setUpServerVMTask(cacheServerSslenabled));
+    serverVM.invoke(() -> setUpServerVMTask(cacheServerSslenabled, 0));
     serverVM.invoke(() -> createServerTask());
 
     Object array[] = (Object[]) serverVM.invoke(() -> getCacheServerEndPointTask());
@@ -511,7 +538,7 @@ public class CacheServerSSLConnectionDUnitTest extends JUnit4DistributedTestCase
     IgnoredException.addIgnoredException("SSLHandshakeException");
     IgnoredException.addIgnoredException("ValidatorException");
 
-    serverVM.invoke(() -> setUpServerVMTask(cacheServerSslenabled));
+    serverVM.invoke(() -> setUpServerVMTask(cacheServerSslenabled, 0));
     serverVM.invoke(() -> createServerTask());
 
     Object array[] = (Object[]) serverVM.invoke(() -> getCacheServerEndPointTask());
@@ -534,7 +561,7 @@ public class CacheServerSSLConnectionDUnitTest extends JUnit4DistributedTestCase
     boolean cacheClientSslenabled = true;
     boolean cacheClientSslRequireAuth = false;
 
-    serverVM.invoke(() -> setUpServerVMTask(cacheServerSslenabled));
+    serverVM.invoke(() -> setUpServerVMTask(cacheServerSslenabled, 0));
     serverVM.invoke(() -> createServerTask());
 
     Object array[] = (Object[]) serverVM.invoke(() -> getCacheServerEndPointTask());
@@ -567,7 +594,7 @@ public class CacheServerSSLConnectionDUnitTest extends JUnit4DistributedTestCase
     boolean cacheClientSslenabled = true;
     boolean cacheClientSslRequireAuth = true;
 
-    serverVM.invoke(() -> setUpServerVMTask(cacheServerSslenabled));
+    serverVM.invoke(() -> setUpServerVMTask(cacheServerSslenabled, 0));
     serverVM.invoke(() -> createServerTask());
 
     Object array[] = (Object[]) serverVM.invoke(() -> getCacheServerEndPointTask());
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/ValueToDataThrowsRuntimeExceptionRegressionTest.java b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/ValueToDataThrowsRuntimeExceptionRegressionTest.java
index e2166b0..7e469b9 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/ValueToDataThrowsRuntimeExceptionRegressionTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/ValueToDataThrowsRuntimeExceptionRegressionTest.java
@@ -97,12 +97,10 @@ public class ValueToDataThrowsRuntimeExceptionRegressionTest extends JUnit4Cache
       Invoke.invokeInEveryVM(new SerializableCallable() {
         @Override
         public Object call() throws Exception {
-          System.getProperties().remove("p2p.oldIO");
           System.getProperties().remove("p2p.nodirectBuffers");
           return null;
         }
       });
-      System.getProperties().remove("p2p.oldIO");
       System.getProperties().remove("p2p.nodirectBuffers");
     }
   }
@@ -110,7 +108,6 @@ public class ValueToDataThrowsRuntimeExceptionRegressionTest extends JUnit4Cache
   @Override
   public Properties getDistributedSystemProperties() {
     Properties props = new Properties();
-    System.setProperty("p2p.oldIO", "true");
     props.setProperty(CONSERVE_SOCKETS, "true");
     // props.setProperty(DistributionConfig.ConfigurationProperties.MCAST_PORT, "12333");
     // props.setProperty(DistributionConfig.DISABLE_TCP_NAME, "true");
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
index 59a8355..d37b39d 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
@@ -28,18 +28,26 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.net.ConnectException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.net.URL;
+import java.nio.ByteBuffer;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
 import java.util.Properties;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.net.ssl.SSLContext;
@@ -57,7 +65,9 @@ import org.junit.rules.TestName;
 
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.DistributionConfigImpl;
+import org.apache.geode.internal.ByteBufferOutputStream;
 import org.apache.geode.internal.security.SecurableCommunicationChannel;
+import org.apache.geode.internal.tcp.ByteBufferInputStream;
 import org.apache.geode.test.junit.categories.MembershipTest;
 
 /**
@@ -104,6 +114,8 @@ public class SSLSocketIntegrationTest {
     System.setProperty("javax.net.ssl.trustStorePassword", "password");
     System.setProperty("javax.net.ssl.keyStore", keystore.getCanonicalPath());
     System.setProperty("javax.net.ssl.keyStorePassword", "password");
+    // System.setProperty("javax.net.debug", "ssl,handshake");
+
 
     Properties properties = new Properties();
     properties.setProperty(MCAST_PORT, "0");
@@ -174,6 +186,99 @@ public class SSLSocketIntegrationTest {
     assertThat(this.messageFromClient.get()).isEqualTo(MESSAGE);
   }
 
+  @Test
+  public void testSecuredSocketTransmissionShouldWorkUsingNIO() throws Exception {
+    ServerSocketChannel serverChannel = ServerSocketChannel.open();
+    serverSocket = serverChannel.socket();
+
+    InetSocketAddress addr = new InetSocketAddress(localHost, 0);
+    serverSocket.bind(addr, 10);
+    int serverPort = this.serverSocket.getLocalPort();
+
+    SocketCreator clusterSocketCreator =
+        SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER);
+    this.serverThread = startServerNIO(serverSocket, 15000);
+
+    await().until(() -> serverThread.isAlive());
+
+    SocketChannel clientChannel = SocketChannel.open();
+    clientChannel.connect(new InetSocketAddress(localHost, serverPort));
+    clientSocket = clientChannel.socket();
+    NioSslEngine engine =
+        clusterSocketCreator.handshakeSSLSocketChannel(clientSocket.getChannel(), 0, true,
+            false);
+    clientChannel.configureBlocking(true);
+
+    // transmit expected string from Client to Server
+    writeMessageToNIOSSLServer(clientChannel, engine);
+    writeMessageToNIOSSLServer(clientChannel, engine);
+    // this is the real assertion of this test
+    await().until(() -> {
+      return !serverThread.isAlive();
+    });
+    assertNull(serverException);
+    // assertThat(this.messageFromClient.get()).isEqualTo(MESSAGE);
+  }
+
+  private void writeMessageToNIOSSLServer(SocketChannel clientChannel, NioSslEngine engine)
+      throws IOException {
+    System.out.println("client sending Hello World message to server");
+    ByteBufferOutputStream bbos = new ByteBufferOutputStream(5000);
+    DataOutputStream dos = new DataOutputStream(bbos);
+    dos.writeUTF("Hello world");
+    dos.flush();
+    bbos.flush();
+    ByteBuffer buffer = bbos.getContentBuffer();
+    System.out.println(
+        "client buffer position is " + buffer.position() + " and limit is " + buffer.limit());
+    ByteBuffer wrappedBuffer = engine.wrap(buffer);
+    System.out.println("client wrapped buffer position is " + wrappedBuffer.position()
+        + " and limit is " + wrappedBuffer.limit());
+    int bytesWritten = clientChannel.write(wrappedBuffer);
+    System.out.println("client bytes written is " + bytesWritten);
+  }
+
+  private Thread startServerNIO(final ServerSocket serverSocket, int timeoutMillis)
+      throws Exception {
+    Thread serverThread = new Thread(new MyThreadGroup(this.testName.getMethodName()), () -> {
+      try {
+        ByteBuffer buffer = ByteBuffer.allocate(5000);
+
+        Socket socket = serverSocket.accept();
+        SocketCreator sc = SocketCreatorFactory.getSocketCreatorForComponent(CLUSTER);
+        NioSslEngine engine =
+            sc.handshakeSSLSocketChannel(socket.getChannel(), timeoutMillis, false, false);
+
+        readMessageFromNIOSSLClient(socket, buffer, engine);
+        readMessageFromNIOSSLClient(socket, buffer, engine);
+      } catch (Throwable throwable) {
+        throwable.printStackTrace(System.out);
+        serverException = throwable;
+      }
+    }, this.testName.getMethodName() + "-server");
+
+    serverThread.start();
+    return serverThread;
+  }
+
+  private void readMessageFromNIOSSLClient(Socket socket, ByteBuffer buffer, NioSslEngine engine)
+      throws IOException {
+    int bytesRead = socket.getChannel().read(buffer);
+    buffer.flip();
+    System.out.println("server bytes read is " + bytesRead + ": buffer position is "
+        + buffer.position() + " and limit is " + buffer.limit());
+    ByteBuffer unwrapped = engine.unwrap(buffer);
+    System.out.println("server unwrapped buffer position is " + unwrapped.position()
+        + " and limit is " + unwrapped.limit());
+    ByteBufferInputStream bbis = new ByteBufferInputStream(unwrapped);
+    DataInputStream dis = new DataInputStream(bbis);
+    String welcome = dis.readUTF();
+    engine.doneReading(unwrapped);
+    assertThat(welcome).isEqualTo("Hello world");
+    System.out.println("server read Hello World message from client");
+  }
+
+
   @Test(expected = SocketTimeoutException.class)
   public void handshakeCanTimeoutOnServer() throws Throwable {
     this.serverSocket = this.socketCreator.createServerSocket(0, 0, this.localHost);
@@ -187,6 +292,33 @@ public class SSLSocketIntegrationTest {
     throw serverException;
   }
 
+  @Test(expected = SocketTimeoutException.class)
+  public void handshakeWithPeerCanTimeout() throws Throwable {
+    ServerSocketChannel serverChannel = ServerSocketChannel.open();
+    serverSocket = serverChannel.socket();
+
+    InetSocketAddress addr = new InetSocketAddress(localHost, 0);
+    serverSocket.bind(addr, 10);
+    int serverPort = this.serverSocket.getLocalPort();
+
+    this.serverThread = startServerNIO(this.serverSocket, 1000);
+
+    Socket socket = new Socket();
+    await().atMost(10, TimeUnit.SECONDS).until(() -> {
+      try {
+        socket.connect(new InetSocketAddress(localHost, serverPort));
+      } catch (ConnectException e) {
+        return false;
+      } catch (SocketException e) {
+        return true; // server socket was closed
+      }
+      return true;
+    });
+    await().untilAsserted(() -> assertFalse(serverThread.isAlive()));
+    assertNotNull(serverException);
+    throw serverException;
+  }
+
   @Test
   public void configureClientSSLSocketCanTimeOut() throws Exception {
     final Semaphore serverCoordination = new Semaphore(0);
@@ -271,7 +403,6 @@ public class SSLSocketIntegrationTest {
 
   private Thread startServer(final ServerSocket serverSocket, int timeoutMillis) throws Exception {
     Thread serverThread = new Thread(new MyThreadGroup(this.testName.getMethodName()), () -> {
-      long startTime = System.currentTimeMillis();
       try {
         Socket socket = serverSocket.accept();
         SocketCreatorFactory.getSocketCreatorForComponent(CLUSTER).handshakeIfSocketIsSSL(socket,
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
index ad109bd..9fa699f 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
@@ -23,8 +23,8 @@ import org.apache.geode.StatisticsType;
 import org.apache.geode.StatisticsTypeFactory;
 import org.apache.geode.internal.NanoTimer;
 import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.net.Buffers;
 import org.apache.geode.internal.statistics.StatisticsTypeFactoryImpl;
-import org.apache.geode.internal.tcp.Buffers;
 import org.apache.geode.internal.util.Breadcrumbs;
 
 /**
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
index dbb4068..8fa9f84 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
@@ -493,7 +493,7 @@ public class DirectChannel {
       }
       try {
         try {
-          con.readAck(msToWait, msInterval, processor);
+          con.readAck(processor);
         } catch (SocketTimeoutException ex) {
           handleAckTimeout(ackTimeout, ackSDTimeout, con, processor);
         }
@@ -688,7 +688,7 @@ public class DirectChannel {
       // wait for ack-severe-alert-threshold period first, then wait forever
       if (ackSATimeout > 0) {
         try {
-          c.readAck((int) ackSATimeout, ackSATimeout, processor);
+          c.readAck(processor);
           return;
         } catch (SocketTimeoutException e) {
           Object[] args = new Object[] {Long.valueOf((ackSATimeout + ackTimeout) / 1000),
@@ -699,7 +699,7 @@ public class DirectChannel {
         }
       }
       try {
-        c.readAck(0, 0, processor);
+        c.readAck(processor);
       } catch (SocketTimeoutException ex) {
         // this can never happen when called with timeout of 0
         logger.error(String.format("Unexpected timeout while waiting for ack from %s",
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Buffers.java b/geode-core/src/main/java/org/apache/geode/internal/net/Buffers.java
similarity index 75%
rename from geode-core/src/main/java/org/apache/geode/internal/tcp/Buffers.java
rename to geode-core/src/main/java/org/apache/geode/internal/net/Buffers.java
index abb7fdb..9213342 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Buffers.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/Buffers.java
@@ -12,12 +12,11 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.apache.geode.internal.tcp;
+package org.apache.geode.internal.net;
 
 import java.lang.ref.SoftReference;
 import java.nio.ByteBuffer;
 import java.util.IdentityHashMap;
-import java.util.Iterator;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.geode.distributed.internal.DMStats;
@@ -27,27 +26,33 @@ public class Buffers {
   /**
    * A list of soft references to byte buffers.
    */
-  private static final ConcurrentLinkedQueue bufferQueue = new ConcurrentLinkedQueue();
+  private static final ConcurrentLinkedQueue<BBSoftReference> bufferQueue =
+      new ConcurrentLinkedQueue<>();
+
+  /**
+   * use direct ByteBuffers instead of heap ByteBuffers for NIO operations
+   */
+  public static boolean useDirectBuffers = !Boolean.getBoolean("p2p.nodirectBuffers");
 
   /**
    * Should only be called by threads that have currently acquired send permission.
    *
    * @return a byte buffer to be used for sending on this connection.
    */
-  static ByteBuffer acquireSenderBuffer(int size, DMStats stats) {
+  public static ByteBuffer acquireSenderBuffer(int size, DMStats stats) {
     return acquireBuffer(size, stats, true);
   }
 
-  static ByteBuffer acquireReceiveBuffer(int size, DMStats stats) {
+  public static ByteBuffer acquireReceiveBuffer(int size, DMStats stats) {
     return acquireBuffer(size, stats, false);
   }
 
-  static ByteBuffer acquireBuffer(int size, DMStats stats, boolean send) {
+  private static ByteBuffer acquireBuffer(int size, DMStats stats, boolean send) {
     ByteBuffer result;
-    if (TCPConduit.useDirectBuffers) {
+    if (useDirectBuffers) {
       IdentityHashMap<BBSoftReference, BBSoftReference> alreadySeen = null; // keys are used like a
                                                                             // set
-      BBSoftReference ref = (BBSoftReference) bufferQueue.poll();
+      BBSoftReference ref = bufferQueue.poll();
       while (ref != null) {
         ByteBuffer bb = ref.getBB();
         if (bb == null) {
@@ -68,7 +73,7 @@ public class Buffers {
           // wasn't big enough so put it back in the queue
           Assert.assertTrue(bufferQueue.offer(ref));
           if (alreadySeen == null) {
-            alreadySeen = new IdentityHashMap<BBSoftReference, BBSoftReference>();
+            alreadySeen = new IdentityHashMap<>();
           }
           if (alreadySeen.put(ref, ref) != null) {
             // if it returns non-null then we have already seen this item
@@ -77,7 +82,7 @@ public class Buffers {
             break;
           }
         }
-        ref = (BBSoftReference) bufferQueue.poll();
+        ref = bufferQueue.poll();
       }
       result = ByteBuffer.allocateDirect(size);
     } else {
@@ -85,26 +90,26 @@ public class Buffers {
       result = ByteBuffer.allocate(size);
     }
     if (send) {
-      stats.incSenderBufferSize(size, TCPConduit.useDirectBuffers);
+      stats.incSenderBufferSize(size, useDirectBuffers);
     } else {
-      stats.incReceiverBufferSize(size, TCPConduit.useDirectBuffers);
+      stats.incReceiverBufferSize(size, useDirectBuffers);
     }
     return result;
   }
 
-  static void releaseSenderBuffer(ByteBuffer bb, DMStats stats) {
+  public static void releaseSenderBuffer(ByteBuffer bb, DMStats stats) {
     releaseBuffer(bb, stats, true);
   }
 
-  static void releaseReceiveBuffer(ByteBuffer bb, DMStats stats) {
+  public static void releaseReceiveBuffer(ByteBuffer bb, DMStats stats) {
     releaseBuffer(bb, stats, false);
   }
 
   /**
    * Releases a previously acquired buffer.
    */
-  static void releaseBuffer(ByteBuffer bb, DMStats stats, boolean send) {
-    if (TCPConduit.useDirectBuffers) {
+  private static void releaseBuffer(ByteBuffer bb, DMStats stats, boolean send) {
+    if (useDirectBuffers) {
       BBSoftReference bbRef = new BBSoftReference(bb, send);
       bufferQueue.offer(bbRef);
     } else {
@@ -117,11 +122,8 @@ public class Buffers {
   }
 
   public static void initBufferStats(DMStats stats) { // fixes 46773
-    if (TCPConduit.useDirectBuffers) {
-      @SuppressWarnings("unchecked")
-      Iterator<BBSoftReference> it = (Iterator<BBSoftReference>) bufferQueue.iterator();
-      while (it.hasNext()) {
-        BBSoftReference ref = it.next();
+    if (useDirectBuffers) {
+      for (BBSoftReference ref : bufferQueue) {
         if (ref.getBB() != null) {
           if (ref.getSend()) { // fix bug 46773
             stats.incSenderBufferSize(ref.getSize(), true);
@@ -142,7 +144,7 @@ public class Buffers {
     private int size;
     private final boolean send;
 
-    public BBSoftReference(ByteBuffer bb, boolean send) {
+    BBSoftReference(ByteBuffer bb, boolean send) {
       super(bb);
       this.size = bb.capacity();
       this.send = send;
@@ -152,7 +154,7 @@ public class Buffers {
       return this.size;
     }
 
-    public synchronized int consumeSize() {
+    synchronized int consumeSize() {
       int result = this.size;
       this.size = 0;
       return result;
@@ -163,7 +165,7 @@ public class Buffers {
     }
 
     public ByteBuffer getBB() {
-      return (ByteBuffer) super.get();
+      return super.get();
     }
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/PeerConnectionFactory.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioEngine.java
similarity index 59%
copy from geode-core/src/main/java/org/apache/geode/internal/tcp/PeerConnectionFactory.java
copy to geode-core/src/main/java/org/apache/geode/internal/net/NioEngine.java
index 148c27a..8163559 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/PeerConnectionFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioEngine.java
@@ -12,21 +12,26 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
+package org.apache.geode.internal.net;
 
-package org.apache.geode.internal.tcp;
+import java.nio.ByteBuffer;
 
-import java.io.IOException;
-import java.net.Socket;
+/**
+ * A pass-through implementation of NioFilter. Use this if you don't need
+ * secure communications.
+ */
+public class NioEngine implements NioFilter {
+  @Override
+  public ByteBuffer wrap(ByteBuffer buffer) {
+    return buffer;
+  }
 
-public class PeerConnectionFactory {
-  /**
-   * creates a connection that we accepted (it was initiated by an explicit connect being done on
-   * the other side). We will only receive data on this socket; never send.
-   */
-  public Connection createReceiver(ConnectionTable table, Socket socket)
-      throws IOException, ConnectionException {
-    Connection connection = new Connection(table, socket);
-    connection.initReceiver();
-    return connection;
+  @Override
+  public ByteBuffer unwrap(ByteBuffer wrappedBuffer) {
+    return wrappedBuffer;
   }
+
+  @Override
+  public void close() {}
+
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
new file mode 100644
index 0000000..f3ada92
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.net;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Prior to transmitting a buffer or processing a received buffer
+ * a NioFilter should be called to wrap (transmit) or unwrap (received)
+ * the buffer in case SSL is being used.
+ */
+public interface NioFilter {
+
+  /** wrap bytes for transmission to another process */
+  ByteBuffer wrap(ByteBuffer buffer) throws IOException;
+
+  /** unwrap bytes received from another process */
+  ByteBuffer unwrap(ByteBuffer wrappedBuffer) throws IOException;
+
+  /**
+   * You must invoke this when done reading from the unwrapped buffer
+   */
+  default void doneReading(ByteBuffer unwrappedBuffer) {
+    if (unwrappedBuffer.position() != 0) {
+      unwrappedBuffer.compact();
+    } else {
+      unwrappedBuffer.position(unwrappedBuffer.limit());
+      unwrappedBuffer.limit(unwrappedBuffer.capacity());
+    }
+  }
+
+  /** invoke this method when you are done using the NioFilter */
+  void close();
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
new file mode 100644
index 0000000..9267880
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.net;
+
+import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_TASK;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLHandshakeException;
+import javax.net.ssl.SSLSession;
+
+import org.apache.geode.GemFireIOException;
+
+
+/**
+ * NioSslEngine uses an SSLEngine to bind SSL logic to a data source
+ */
+public class NioSslEngine implements NioFilter {
+  private final SSLEngine engine;
+  private final SocketChannel socketChannel;
+
+  /**
+   * myNetData holds bytes wrapped by the SSLEngine
+   */
+  private ByteBuffer myNetData;
+
+  /**
+   * peerAppData holds the last unwrapped data from a peer
+   */
+  private ByteBuffer peerAppData;
+
+  ByteBuffer myAppData;
+
+  private boolean useDirectBuffers;
+
+  public NioSslEngine(SocketChannel channel, SSLEngine engine, boolean useDirectBuffers) {
+    SSLSession session = engine.getSession();
+    int appBufferSize = session.getApplicationBufferSize();
+    int packetBufferSize = engine.getSession().getPacketBufferSize();
+    if (useDirectBuffers) {
+      this.myNetData = ByteBuffer.allocateDirect(packetBufferSize);
+      this.peerAppData = ByteBuffer.allocateDirect(appBufferSize);
+      this.myAppData = ByteBuffer.allocateDirect(appBufferSize);
+      this.useDirectBuffers = true;
+    } else {
+      this.myNetData = ByteBuffer.allocate(packetBufferSize);
+      this.peerAppData = ByteBuffer.allocate(appBufferSize);
+      this.myAppData = ByteBuffer.allocate(appBufferSize);
+    }
+    this.engine = engine;
+    this.socketChannel = channel;
+  }
+
+  /**
+   * This will throw an SSLHandshakeException if the handshake doesn't terminate in
+   * a FINISHED state. It may throw other IOExceptions caused by I/O operations
+   */
+  public NioSslEngine handshake(int timeout) throws IOException {
+    ByteBuffer peerNetData = ByteBuffer.allocate(engine.getSession().getPacketBufferSize());
+
+    long timeoutNanos = -1;
+    if (timeout > 0) {
+      timeoutNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout);
+    }
+
+    // Begin handshake
+    engine.beginHandshake();
+    SSLEngineResult.HandshakeStatus hs = engine.getHandshakeStatus();
+
+    // Process handshaking message
+    while (hs != SSLEngineResult.HandshakeStatus.FINISHED &&
+        hs != SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) {
+
+      if (timeoutNanos > 0) {
+        if (timeoutNanos < System.nanoTime()) {
+          throw new SocketTimeoutException("handshake timed out");
+        }
+      }
+
+      switch (hs) {
+
+        case NEED_UNWRAP:
+          // Receive handshaking data from peer
+          if (socketChannel.read(peerNetData) < 0) {
+            // The channel has reached end-of-stream
+          }
+
+          // Process incoming handshaking data
+          peerNetData.flip();
+          SSLEngineResult res = engine.unwrap(peerNetData, peerAppData);
+          peerNetData.compact();
+          hs = res.getHandshakeStatus();
+
+          // Check status
+          switch (res.getStatus()) {
+            case BUFFER_UNDERFLOW:
+              break;
+            case BUFFER_OVERFLOW:
+              break;
+            case OK:
+              // Handle OK status
+              break;
+            case CLOSED:
+              break;
+            default:
+              throw new IllegalStateException("Unknown SSLEngineResult status: " + res.getStatus());
+          }
+          break;
+
+        case NEED_WRAP:
+          // Empty the local network packet buffer.
+          myNetData.clear();
+
+          // Generate handshaking data
+          res = engine.wrap(myAppData, myNetData);
+          hs = res.getHandshakeStatus();
+
+          // Check status
+          switch (res.getStatus()) {
+            case BUFFER_UNDERFLOW:
+              break;
+            case BUFFER_OVERFLOW:
+              break;
+            case OK:
+              myNetData.flip();
+              // Send the handshaking data to peer
+              while (myNetData.hasRemaining()) {
+                socketChannel.write(myNetData);
+              }
+              break;
+            case CLOSED:
+              break;
+            default:
+              throw new IllegalStateException("Unknown SSLEngineResult status: " + res.getStatus());
+          }
+          break;
+
+        case NOT_HANDSHAKING:
+          break;
+        case FINISHED:
+          break;
+        case NEED_TASK:
+          // Handle blocking tasks
+          handleBlockingTasks();
+          hs = engine.getHandshakeStatus();
+          break;
+        default:
+          throw new IllegalStateException("Unknown SSL Handshake state: " + hs);
+      }
+    }
+    if (!Objects.equals(SSLEngineResult.HandshakeStatus.FINISHED, hs)) {
+      throw new SSLHandshakeException("SSL Handshake terminated with status " + hs);
+    }
+    return this;
+  }
+
+  private void handleBlockingTasks() {
+    Runnable task;
+    while ((task = engine.getDelegatedTask()) != null) {
+      // these tasks could be run in other threads but the SSLEngine will block until they finish
+      task.run();
+    }
+  }
+
+  @Override
+  public ByteBuffer wrap(ByteBuffer appData) throws IOException {
+    myNetData.clear();
+
+    while (appData.hasRemaining()) {
+      // ensure we have lots of capacity since encrypted data might
+      // be larger than the app data
+      int remaining = myNetData.capacity() - myNetData.position();
+
+      if (remaining < (appData.remaining() * 2)) {
+        int newCapacity = newBufferCapacity(appData, myNetData);
+        myNetData = newBuffer(myNetData, newCapacity);
+      }
+
+      SSLEngineResult wrapResult = engine.wrap(appData, myNetData);
+
+      if (wrapResult.getHandshakeStatus() == NEED_TASK) {
+        handleBlockingTasks();
+      }
+
+      if (wrapResult.getStatus() != SSLEngineResult.Status.OK) {
+        throw new SSLException("Error encrypting data: " + wrapResult);
+      }
+    }
+
+    myNetData.flip();
+
+    return myNetData;
+  }
+
+  @Override
+  public ByteBuffer unwrap(ByteBuffer wrappedBuffer) throws IOException {
+    // note that we do not clear peerAppData as it may hold a partial
+    // message. TcpConduit, for instance, uses message chunking to
+    // transmit large payloads and we may have read a partial chunk
+    // during the previous unwrap
+
+    while (wrappedBuffer.hasRemaining()) {
+      int remaining = peerAppData.capacity() - peerAppData.position();
+      if (remaining < wrappedBuffer.remaining() * 2) {
+        int newCapacity = newBufferCapacity(peerAppData, wrappedBuffer);
+        peerAppData = newBuffer(peerAppData, newCapacity);
+      }
+      SSLEngineResult unwrapResult = engine.unwrap(wrappedBuffer, peerAppData);
+      if (unwrapResult.getHandshakeStatus() == NEED_TASK) {
+        handleBlockingTasks();
+      }
+
+      if (unwrapResult.getStatus() != SSLEngineResult.Status.OK) {
+        throw new SSLException("Error decrypting data: " + unwrapResult);
+      }
+    }
+    wrappedBuffer.clear();
+    peerAppData.flip();
+    return peerAppData;
+  }
+
+  @Override
+  public void close() {
+    engine.closeOutbound();
+    try {
+      ByteBuffer empty = ByteBuffer.allocate(0);
+
+      while (!engine.isOutboundDone()) {
+        // Get close message
+        SSLEngineResult res = engine.wrap(empty, myNetData);
+
+        // Send close message to peer
+        while (myNetData.hasRemaining()) {
+          socketChannel.write(myNetData);
+          myNetData.compact();
+        }
+      }
+    } catch (IOException e) {
+      throw new GemFireIOException("exception closing SSL session", e);
+    }
+  }
+
+  private int newBufferCapacity(ByteBuffer sourceBuffer, ByteBuffer targetBuffer) {
+    return Math.max(targetBuffer.position() + sourceBuffer.remaining() * 2,
+        targetBuffer.capacity() * 2);
+  }
+
+  private ByteBuffer newBuffer(ByteBuffer existing, int desiredCapacity) {
+    ByteBuffer newBuffer;
+    if (useDirectBuffers) {
+      newBuffer = ByteBuffer.allocateDirect(desiredCapacity);
+    } else {
+      newBuffer = ByteBuffer.allocate(desiredCapacity);
+    }
+    newBuffer.clear();
+    existing.flip();
+    newBuffer.put(existing);
+    return newBuffer;
+  }
+
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
index f6dc962..4d8fc8e 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
@@ -30,6 +30,7 @@ import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
 import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
 import java.security.GeneralSecurityException;
 import java.security.KeyStore;
 import java.security.KeyStoreException;
@@ -50,6 +51,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.locks.LockSupport;
 
 import javax.naming.Context;
 import javax.naming.NamingEnumeration;
@@ -339,7 +341,6 @@ public class SocketCreator {
           .equals(sslConfig.getSecuredCommunicationChannel())) {
         if (this.sslConfig.isEnabled()) {
           System.setProperty("p2p.useSSL", "true");
-          System.setProperty("p2p.oldIO", "true");
           System.setProperty("p2p.nodirectBuffers", "true");
         } else {
           System.setProperty("p2p.useSSL", "false");
@@ -866,14 +867,6 @@ public class SocketCreator {
   }
 
   /**
-   * Return a client socket. This method is used by peers.
-   */
-  public Socket connectForServer(InetAddress inetadd, int port, int socketBufferSize)
-      throws IOException {
-    return connect(inetadd, port, 0, null, false, socketBufferSize);
-  }
-
-  /**
    * Return a client socket, timing out if unable to connect and timeout > 0 (millis). The parameter
    * <i>timeout</i> is ignored if SSL is being used, as there is no timeout argument in the ssl
    * socket factory
@@ -962,6 +955,37 @@ public class SocketCreator {
   }
 
   /**
+   * See
+   * https://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html#SSLENG
+   *
+   * @param socketChannel the socket's NIO channel
+   * @param timeout handshake timeout in milliseconds. No timeout if <= 0
+   * @param clientSocket set to true if you initiated the connect(), false if you accepted it
+   * @param useDirectBuffers whether to use direct ByteBuffers
+   * @return The SSLEngine to be used in processing data for sending/receiving from the channel
+   */
+  public NioSslEngine handshakeSSLSocketChannel(SocketChannel socketChannel, int timeout,
+      boolean clientSocket,
+      boolean useDirectBuffers)
+      throws IOException {
+    SSLEngine engine = sslContext.createSSLEngine();
+    engine.setUseClientMode(clientSocket);
+    while (!socketChannel.finishConnect()) {
+      LockSupport.parkNanos(100L);
+    }
+    boolean blocking = socketChannel.isBlocking();
+    if (blocking) {
+      socketChannel.configureBlocking(false);
+    }
+    NioSslEngine nEngine =
+        new NioSslEngine(socketChannel, engine, useDirectBuffers).handshake(timeout);
+    if (blocking) {
+      socketChannel.configureBlocking(true);
+    }
+    return nEngine;
+  }
+
+  /**
    * Use this method to perform the SSL handshake on a newly accepted socket. Non-SSL
    * sockets are ignored by this method.
    *
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index f234ee7..2ce177a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -16,16 +16,11 @@ package org.apache.geode.internal.tcp;
 
 import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_PEER_AUTH_INIT;
 
-import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InterruptedIOException;
-import java.io.OutputStream;
 import java.net.ConnectException;
-import java.net.Inet6Address;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketException;
@@ -45,6 +40,8 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import javax.net.ssl.SSLException;
+
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
@@ -69,7 +66,6 @@ import org.apache.geode.distributed.internal.direct.DirectChannel;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.distributed.internal.membership.MembershipManager;
 import org.apache.geode.internal.Assert;
-import org.apache.geode.internal.ByteArrayDataInput;
 import org.apache.geode.internal.DSFIDFactory;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.SystemTimer;
@@ -78,7 +74,12 @@ import org.apache.geode.internal.Version;
 import org.apache.geode.internal.alerting.AlertingAction;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.LoggingThread;
+import org.apache.geode.internal.net.Buffers;
+import org.apache.geode.internal.net.NioEngine;
+import org.apache.geode.internal.net.NioFilter;
 import org.apache.geode.internal.net.SocketCreator;
+import org.apache.geode.internal.net.SocketCreatorFactory;
+import org.apache.geode.internal.security.SecurableCommunicationChannel;
 import org.apache.geode.internal.tcp.MsgReader.Header;
 import org.apache.geode.internal.util.concurrent.ReentrantSemaphore;
 
@@ -91,27 +92,25 @@ import org.apache.geode.internal.util.concurrent.ReentrantSemaphore;
 public class Connection implements Runnable {
   private static final Logger logger = LogService.getLogger();
 
-  private static final int INITIAL_CAPACITY =
-      Integer.getInteger("p2p.readerBufferSize", 32768).intValue();
   private static int P2P_CONNECT_TIMEOUT;
   private static boolean IS_P2P_CONNECT_TIMEOUT_INITIALIZED = false;
 
-  public static final int NORMAL_MSG_TYPE = 0x4c;
-  public static final int CHUNKED_MSG_TYPE = 0x4d; // a chunk of one logical msg
-  public static final int END_CHUNKED_MSG_TYPE = 0x4e; // last in a series of chunks
-  public static final int DIRECT_ACK_BIT = 0x20;
+  static final int NORMAL_MSG_TYPE = 0x4c;
+  static final int CHUNKED_MSG_TYPE = 0x4d; // a chunk of one logical msg
+  static final int END_CHUNKED_MSG_TYPE = 0x4e; // last in a series of chunks
+  static final int DIRECT_ACK_BIT = 0x20;
 
-  public static final int MSG_HEADER_SIZE_OFFSET = 0;
-  public static final int MSG_HEADER_TYPE_OFFSET = 4;
-  public static final int MSG_HEADER_ID_OFFSET = 5;
-  public static final int MSG_HEADER_BYTES = 7;
+  static final int MSG_HEADER_SIZE_OFFSET = 0;
+  static final int MSG_HEADER_TYPE_OFFSET = 4;
+  static final int MSG_HEADER_ID_OFFSET = 5;
+  static final int MSG_HEADER_BYTES = 7;
 
   /**
    * Small buffer used for send socket buffer on receiver connections and receive buffer on sender
    * connections.
    */
-  public static final int SMALL_BUFFER_SIZE =
-      Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "SMALL_BUFFER_SIZE", 4096).intValue();
+  static final int SMALL_BUFFER_SIZE =
+      Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "SMALL_BUFFER_SIZE", 4096);
 
   /** counter to give connections a unique id */
   private static AtomicLong idCounter = new AtomicLong(1);
@@ -124,6 +123,7 @@ public class Connection implements Runnable {
   private final ConnectionTable owner;
 
   private final TCPConduit conduit;
+  private NioFilter ioFilter;
 
   /**
    * Set to false once run() is terminating. Using this instead of Thread.isAlive as the reader
@@ -141,7 +141,12 @@ public class Connection implements Runnable {
   /** The idle timeout timer task for this connection */
   private SystemTimerTask idleTask;
 
-  private static final ThreadLocal isReaderThread = new ThreadLocal();
+  private static final ThreadLocal<Boolean> isReaderThread = new ThreadLocal<Boolean>() {
+    @Override
+    public Boolean initialValue() {
+      return Boolean.FALSE;
+    }
+  };
 
   public static void makeReaderThread() {
     // mark this thread as a reader thread
@@ -153,13 +158,8 @@ public class Connection implements Runnable {
   }
 
   // return true if this thread is a reader thread
-  public static boolean isReaderThread() {
-    Object o = isReaderThread.get();
-    if (o == null) {
-      return false;
-    } else {
-      return ((Boolean) o).booleanValue();
-    }
+  private static boolean isReaderThread() {
+    return isReaderThread.get();
   }
 
   private int getP2PConnectTimeout() {
@@ -182,10 +182,15 @@ public class Connection implements Runnable {
   private static final boolean DOMINO_THREAD_OWNED_SOCKETS =
       Boolean.getBoolean("p2p.ENABLE_DOMINO_THREAD_OWNED_SOCKETS");
 
-  private static final ThreadLocal isDominoThread = new ThreadLocal();
+  private static final ThreadLocal<Boolean> isDominoThread = new ThreadLocal<Boolean>() {
+    @Override
+    public Boolean initialValue() {
+      return Boolean.FALSE;
+    }
+  };
 
   // return true if this thread is a reader thread
-  public static boolean tipDomino() {
+  private static boolean tipDomino() {
     if (DOMINO_THREAD_OWNED_SOCKETS) {
       // mark this thread as one who wants to send ALL on TO sockets
       ConnectionTable.threadWantsOwnResources();
@@ -197,25 +202,17 @@ public class Connection implements Runnable {
   }
 
   public static boolean isDominoThread() {
-    Object o = isDominoThread.get();
-    if (o == null) {
-      return false;
-    } else {
-      return ((Boolean) o).booleanValue();
-    }
+    return isDominoThread.get();
   }
 
   /** the socket entrusted to this connection */
   private final Socket socket;
 
-  /** the non-NIO output stream */
-  OutputStream output;
-
   /** output stream/channel lock */
   private final Object outLock = new Object();
 
   /** the ID string of the conduit (for logging) */
-  String conduitIdStr;
+  private String conduitIdStr;
 
   /** Identifies the java group member on the other side of the connection. */
   InternalDistributedMember remoteAddr;
@@ -223,7 +220,7 @@ public class Connection implements Runnable {
   /**
    * Identifies the version of the member on the other side of the connection.
    */
-  Version remoteVersion;
+  private Version remoteVersion;
 
   /**
    * True if this connection was accepted by a listening socket. This makes it a receiver. False if
@@ -286,16 +283,16 @@ public class Connection implements Runnable {
   /**
    * Number of bytes in the outgoingQueue. Used to control capacity.
    */
-  private long queuedBytes = 0;
+  private long queuedBytes;
 
   /** used for async writes */
-  Thread pusherThread;
+  private Thread pusherThread;
 
   /**
    * The maximum number of concurrent senders sending a message to a single recipient.
    */
   private static final int MAX_SENDERS = Integer
-      .getInteger("p2p.maxConnectionSenders", DirectChannel.DEFAULT_CONCURRENCY_LEVEL).intValue();
+      .getInteger("p2p.maxConnectionSenders", DirectChannel.DEFAULT_CONCURRENCY_LEVEL);
   /**
    * This semaphore is used to throttle how many threads will try to do sends on this connection
    * concurrently. A thread must acquire this semaphore before it is allowed to start serializing
@@ -304,10 +301,10 @@ public class Connection implements Runnable {
   private final Semaphore senderSem = new ReentrantSemaphore(MAX_SENDERS);
 
   /** Set to true once the handshake has been read */
-  volatile boolean handshakeRead = false;
-  volatile boolean handshakeCancelled = false;
+  private volatile boolean handshakeRead;
+  private volatile boolean handshakeCancelled;
 
-  private volatile int replyCode = 0;
+  private volatile int replyCode;
 
   private static final byte REPLY_CODE_OK = (byte) 69;
   private static final byte REPLY_CODE_OK_WITH_ASYNC_INFO = (byte) 70;
@@ -323,7 +320,7 @@ public class Connection implements Runnable {
   /** set to true once a close begins */
   private final AtomicBoolean closing = new AtomicBoolean(false);
 
-  volatile boolean readerShuttingDown = false;
+  private volatile boolean readerShuttingDown = false;
 
   /** whether the socket is connected */
   volatile boolean connected = false;
@@ -331,10 +328,10 @@ public class Connection implements Runnable {
   /**
    * Set to true once a connection finishes its constructor
    */
-  volatile boolean finishedConnecting = false;
+  private volatile boolean finishedConnecting = false;
 
-  volatile boolean accessed = true;
-  volatile boolean socketInUse = false;
+  private volatile boolean accessed = true;
+  private volatile boolean socketInUse = false;
   volatile boolean timedOut = false;
 
   /**
@@ -346,7 +343,7 @@ public class Connection implements Runnable {
    * millisecond clock at the time message transmission started, if doing forced-disconnect
    * processing
    */
-  long transmissionStartTime;
+  private long transmissionStartTime;
 
   /** ack wait timeout - if socketInUse, use this to trigger SUSPECT processing */
   private long ackWaitTimeout;
@@ -358,38 +355,42 @@ public class Connection implements Runnable {
    * other connections participating in the current transmission. we notify them if ackSATimeout
    * expires to keep all members from generating alerts when only one is slow
    */
-  List ackConnectionGroup;
+  private List ackConnectionGroup;
 
   /** name of thread that we're currently performing an operation in (may be null) */
-  String ackThreadName;
+  private String ackThreadName;
 
-  /** the buffer used for NIO message receipt */
-  ByteBuffer nioInputBuffer;
+  /** the buffer used for message receipt */
+  private ByteBuffer inputBuffer;
 
   /** the length of the next message to be dispatched */
-  int nioMessageLength;
+  private int messageLength;
 
   /** the type of message being received */
-  byte nioMessageType;
+  private byte messageType;
+
+  /**
+   * when messages are chunked by a MsgStreamer we track the destreamers on
+   * the receiving side using a message identifier
+   */
+  private short messageId;
+
+  /** whether the length of the next message has been established */
+  private boolean lengthSet = false;
 
   /** used to lock access to destreamer data */
   private final Object destreamerLock = new Object();
 
   /** caches a msg destreamer that is currently not being used */
-  MsgDestreamer idleMsgDestreamer;
+  private MsgDestreamer idleMsgDestreamer;
 
   /**
-   * used to map a msgId to a MsgDestreamer which are used for destreaming chunked messages using
-   * nio
+   * used to map a msgId to a MsgDestreamer which are used for destreaming chunked messages
    */
-  HashMap destreamerMap;
-
-  boolean directAck;
+  private HashMap destreamerMap;
 
-  short nioMsgId;
+  private boolean directAck;
 
-  /** whether the length of the next message has been established */
-  boolean nioLengthSet = false;
 
   /** is this connection used for serial message delivery? */
   boolean preserveOrder = false;
@@ -422,7 +423,7 @@ public class Connection implements Runnable {
     setSocketBufferSize(sock, false, requestedSize);
   }
 
-  public int getReceiveBufferSize() {
+  int getReceiveBufferSize() {
     return recvBufferSize;
   }
 
@@ -447,7 +448,6 @@ public class Connection implements Runnable {
           } else {
             sock.setReceiveBufferSize(requestedSize);
           }
-        } else {
         }
       } catch (SocketException ignore) {
       }
@@ -461,7 +461,7 @@ public class Connection implements Runnable {
         if (actualSize < requestedSize) {
           logger.info("Socket {} is {} instead of the requested {}.",
               (send ? "send buffer size" : "receive buffer size"),
-              Integer.valueOf(actualSize), Integer.valueOf(requestedSize));
+              actualSize, requestedSize);
         } else if (actualSize > requestedSize) {
           if (logger.isTraceEnabled()) {
             logger.trace("Socket {} buffer size is {} instead of the requested {}",
@@ -488,7 +488,7 @@ public class Connection implements Runnable {
   /**
    * Returns the size of the send buffer on this connection's socket.
    */
-  public int getSendBufferSize() {
+  int getSendBufferSize() {
     int result = this.sendBufferSize;
     if (result != -1) {
       return result;
@@ -504,32 +504,13 @@ public class Connection implements Runnable {
   }
 
   /**
-   * creates a connection that we accepted (it was initiated by an explicit connect being done on
-   * the other side). We will only receive data on this socket; never send.
-   */
-  protected static Connection createReceiver(ConnectionTable table, Socket socket)
-      throws IOException, ConnectionException {
-    Connection connection = new Connection(table, socket);
-    boolean readerStarted = false;
-    try {
-      connection.startReader(table);
-      readerStarted = true;
-    } finally {
-      if (!readerStarted) {
-        connection.closeForReconnect(
-            "could not start reader thread");
-      }
-    }
-    connection.waitForHandshake();
-    connection.finishedConnecting = true;
-    return connection;
-  }
-
-  /**
-   * creates a connection that we accepted (it was initiated by an explicit connect being done on
+   * creates a "reader" connection that we accepted (it was initiated by an explicit connect being
+   * done on
    * the other side).
    */
-  protected Connection(ConnectionTable t, Socket socket) throws IOException, ConnectionException {
+  protected Connection(ConnectionTable t, Socket socket, NioFilter nioFilter)
+      throws ConnectionException {
+    this.ioFilter = nioFilter;
     if (t == null) {
       throw new IllegalArgumentException(
           "Null ConnectionTable");
@@ -552,19 +533,9 @@ public class Connection implements Runnable {
       // unable to get the settings we want. Don't log an error because it will
       // likely happen a lot
     }
-    if (!useNIO()) {
-      try {
-        // this.output = new BufferedOutputStream(socket.getOutputStream(), SMALL_BUFFER_SIZE);
-        this.output = socket.getOutputStream();
-      } catch (IOException io) {
-        logger.fatal("Unable to get P2P connection streams", io);
-        t.getSocketCloser().asyncClose(socket, this.remoteAddr.toString(), null);
-        throw io;
-      }
-    }
   }
 
-  protected void initReceiver() {
+  void initReceiver() {
     this.startReader(owner);
     this.waitForHandshake();
     this.finishedConnecting = true;
@@ -578,7 +549,7 @@ public class Connection implements Runnable {
   /**
    * Returns true if an idle connection was detected.
    */
-  public boolean checkForIdleTimeout() {
+  boolean checkForIdleTimeout() {
     if (isSocketClosed()) {
       return true;
     }
@@ -630,7 +601,7 @@ public class Connection implements Runnable {
     bytes[MSG_HEADER_BYTES] = REPLY_CODE_OK;
     int allocSize = bytes.length;
     ByteBuffer bb;
-    if (TCPConduit.useDirectBuffers) {
+    if (Buffers.useDirectBuffers) {
       bb = ByteBuffer.allocateDirect(allocSize);
     } else {
       bb = ByteBuffer.allocate(allocSize);
@@ -645,27 +616,27 @@ public class Connection implements Runnable {
    */
   public static final int MAX_MSG_SIZE = 0x00ffffff;
 
-  public static int calcHdrSize(int byteSize) {
+  static int calcHdrSize(int byteSize) {
     if (byteSize > MAX_MSG_SIZE) {
       throw new IllegalStateException(String.format("tcp message exceeded max size of %s",
-          Integer.valueOf(MAX_MSG_SIZE)));
+          MAX_MSG_SIZE));
     }
     int hdrSize = byteSize;
     hdrSize |= (HANDSHAKE_VERSION << 24);
     return hdrSize;
   }
 
-  public static int calcMsgByteSize(int hdrSize) {
+  static int calcMsgByteSize(int hdrSize) {
     return hdrSize & MAX_MSG_SIZE;
   }
 
-  public static byte calcHdrVersion(int hdrSize) throws IOException {
+  static byte calcHdrVersion(int hdrSize) throws IOException {
     byte ver = (byte) (hdrSize >> 24);
     if (ver != HANDSHAKE_VERSION) {
       throw new IOException(
           String.format(
               "Detected wrong version of GemFire product during handshake. Expected %s but found %s",
-              new Object[] {new Byte(HANDSHAKE_VERSION), new Byte(ver)}));
+              HANDSHAKE_VERSION, ver));
     }
     return ver;
   }
@@ -676,7 +647,7 @@ public class Connection implements Runnable {
     if (this.isReceiver) {
       DistributionConfig cfg = owner.getConduit().config;
       ByteBuffer bb;
-      if (useNIO() && TCPConduit.useDirectBuffers) {
+      if (Buffers.useDirectBuffers) {
         bb = ByteBuffer.allocateDirect(128);
       } else {
         bb = ByteBuffer.allocate(128);
@@ -692,30 +663,16 @@ public class Connection implements Runnable {
       Version.writeOrdinal(bb, Version.CURRENT.ordinal(), true);
       // now set the msg length into position 0
       bb.putInt(0, calcHdrSize(bb.position() - MSG_HEADER_BYTES));
-      if (useNIO()) {
-        my_okHandshakeBuf = bb;
-        bb.flip();
-      } else {
-        my_okHandshakeBytes = new byte[bb.position()];
-        bb.flip();
-        bb.get(my_okHandshakeBytes);
-      }
+      my_okHandshakeBuf = bb;
+      bb.flip();
     } else {
       my_okHandshakeBuf = okHandshakeBuf;
       my_okHandshakeBytes = okHandshakeBytes;
     }
-    if (useNIO()) {
-      assert my_okHandshakeBuf != null;
-      synchronized (my_okHandshakeBuf) {
-        my_okHandshakeBuf.position(0);
-        nioWriteFully(getSocket().getChannel(), my_okHandshakeBuf, false, null);
-      }
-    } else {
-      synchronized (outLock) {
-        assert my_okHandshakeBytes != null;
-        this.output.write(my_okHandshakeBytes, 0, my_okHandshakeBytes.length);
-        this.output.flush();
-      }
+    assert my_okHandshakeBuf != null;
+    synchronized (my_okHandshakeBuf) {
+      my_okHandshakeBuf.position(0);
+      writeFully(getSocket().getChannel(), my_okHandshakeBuf, false, null);
     }
   }
 
@@ -730,7 +687,7 @@ public class Connection implements Runnable {
   // NOTICE: handshake_version should not be changed anymore. Use the gemfire
   // version transmitted with the handshake bits and handle old handshakes
   // based on that
-  public static final byte HANDSHAKE_VERSION = 7; // product version exchange during handshake
+  private static final byte HANDSHAKE_VERSION = 7; // product version exchange during handshake
 
   /**
    * @throws ConnectionException if the conduit has stopped
@@ -766,7 +723,7 @@ public class Connection implements Runnable {
                     String.format(
                         "Connection handshake with %s timed out after waiting %s milliseconds.",
 
-                        peerName, Integer.valueOf(HANDSHAKE_TIMEOUT_MS)));
+                        peerName, HANDSHAKE_TIMEOUT_MS));
               } else {
                 peerName = "socket " + this.socket.getRemoteSocketAddress().toString() + ":"
                     + this.socket.getPort();
@@ -774,7 +731,7 @@ public class Connection implements Runnable {
               throw new ConnectionException(
                   String.format(
                       "Connection handshake with %s timed out after waiting %s milliseconds.",
-                      peerName, Integer.valueOf(HANDSHAKE_TIMEOUT_MS)));
+                      peerName, HANDSHAKE_TIMEOUT_MS));
             } else {
               success = this.handshakeRead;
             }
@@ -828,9 +785,9 @@ public class Connection implements Runnable {
   /**
    * asynchronously close this connection
    *
-   * @param beingSick test hook to simulate sickness in communications & membership
+   * @param beingSickForTests test hook to simulate sickness in communications & membership
    */
-  private void asyncClose(boolean beingSick) {
+  private void asyncClose(boolean beingSickForTests) {
     // note: remoteAddr may be null if this is a receiver that hasn't finished its handshake
 
     // we do the close in a background thread because the operation may hang if
@@ -838,7 +795,7 @@ public class Connection implements Runnable {
 
     // if simulating sickness, sockets must be closed in-line so that tests know
     // that the vm is sick when the beSick operation completes
-    if (beingSick) {
+    if (beingSickForTests) {
       prepareForAsyncClose();
     } else {
       if (this.asyncCloseCalled.compareAndSet(false, true)) {
@@ -881,7 +838,7 @@ public class Connection implements Runnable {
     }
   }
 
-  private void handshakeNio() throws IOException {
+  private void handshakeFromNewSender() throws IOException {
     waitForAddressCompletion();
 
     InternalDistributedMember myAddr = this.owner.getConduit().getMemberId();
@@ -914,42 +871,7 @@ public class Connection implements Runnable {
     // }
     connectHandshake.setMessageHeader(NORMAL_MSG_TYPE, ClusterDistributionManager.STANDARD_EXECUTOR,
         MsgIdGenerator.NO_MSG_ID);
-    nioWriteFully(getSocket().getChannel(), connectHandshake.getContentBuffer(), false, null);
-  }
-
-  private void handshakeStream() throws IOException {
-    waitForAddressCompletion();
-
-    this.output = getSocket().getOutputStream();
-    ByteArrayOutputStream baos = new ByteArrayOutputStream(CONNECT_HANDSHAKE_SIZE);
-    DataOutputStream os = new DataOutputStream(baos);
-    InternalDistributedMember myAddr = owner.getConduit().getMemberId();
-    os.writeByte(0);
-    os.writeByte(HANDSHAKE_VERSION);
-    // NOTE: if you add or remove code in this section bump HANDSHAKE_VERSION
-    InternalDataSerializer.invokeToData(myAddr, os);
-    os.writeBoolean(this.sharedResource);
-    os.writeBoolean(this.preserveOrder);
-    os.writeLong(this.uniqueId);
-    Version.CURRENT.writeOrdinal(os, true);
-    os.writeInt(dominoCount.get() + 1);
-    os.flush();
-
-    byte[] msg = baos.toByteArray();
-    int len = calcHdrSize(msg.length);
-    byte[] lenbytes = new byte[MSG_HEADER_BYTES];
-    lenbytes[MSG_HEADER_SIZE_OFFSET] = (byte) ((len / 0x1000000) & 0xff);
-    lenbytes[MSG_HEADER_SIZE_OFFSET + 1] = (byte) ((len / 0x10000) & 0xff);
-    lenbytes[MSG_HEADER_SIZE_OFFSET + 2] = (byte) ((len / 0x100) & 0xff);
-    lenbytes[MSG_HEADER_SIZE_OFFSET + 3] = (byte) (len & 0xff);
-    lenbytes[MSG_HEADER_TYPE_OFFSET] = (byte) NORMAL_MSG_TYPE;
-    lenbytes[MSG_HEADER_ID_OFFSET] = (byte) ((MsgIdGenerator.NO_MSG_ID >> 8) & 0xff);
-    lenbytes[MSG_HEADER_ID_OFFSET + 1] = (byte) (MsgIdGenerator.NO_MSG_ID & 0xff);
-    synchronized (outLock) {
-      this.output.write(lenbytes, 0, lenbytes.length);
-      this.output.write(msg, 0, msg.length);
-      this.output.flush();
-    }
+    writeFully(getSocket().getChannel(), connectHandshake.getContentBuffer(), false, null);
   }
 
   /**
@@ -958,20 +880,15 @@ public class Connection implements Runnable {
    */
   private void attemptHandshake(ConnectionTable connTable) throws IOException {
     // send HANDSHAKE
-    // send this server's port. It's expected on the other side
-    if (useNIO()) {
-      handshakeNio();
-    } else {
-      handshakeStream();
-    }
-
+    // send this member's information. It's expected on the other side
+    handshakeFromNewSender();
     startReader(connTable); // this reader only reads the handshake and then exits
     waitForHandshake(); // waiting for reply
   }
 
   /** time between connection attempts */
   private static final int RECONNECT_WAIT_TIME = Integer
-      .getInteger(DistributionConfig.GEMFIRE_PREFIX + "RECONNECT_WAIT_TIME", 2000).intValue();
+      .getInteger(DistributionConfig.GEMFIRE_PREFIX + "RECONNECT_WAIT_TIME", 2000);
 
   /**
    * creates a new connection to a remote server. We are initiating this connection; the other side
@@ -1106,10 +1023,7 @@ public class Connection implements Runnable {
             }
           } catch (ConnectionException e) {
             if (giveUpOnMember(mgr, remoteAddr)) {
-              IOException ioe =
-                  new IOException("Handshake failed");
-              ioe.initCause(e);
-              throw ioe;
+              throw new IOException("Handshake failed", e);
             }
             t.getConduit().getCancelCriterion().checkCancelInProgress(null);
             logger.info(
@@ -1208,80 +1122,76 @@ public class Connection implements Runnable {
 
     InetSocketAddress addr =
         new InetSocketAddress(remoteAddr.getInetAddress(), remoteAddr.getDirectChannelPort());
-    if (useNIO()) {
-      SocketChannel channel = SocketChannel.open();
-      this.owner.addConnectingSocket(channel.socket(), addr.getAddress());
+    SocketChannel channel = SocketChannel.open();
+    this.owner.addConnectingSocket(channel.socket(), addr.getAddress());
+
+    try {
+      channel.socket().setTcpNoDelay(true);
+      channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
+
+      /*
+       * If conserve-sockets is false, the socket can be used for receiving responses, so set the
+       * receive buffer accordingly.
+       */
+      if (!sharedResource) {
+        setReceiveBufferSize(channel.socket(), this.owner.getConduit().tcpBufferSize);
+      } else {
+        setReceiveBufferSize(channel.socket(), SMALL_BUFFER_SIZE); // make small since only
+        // receive ack messages
+      }
+      setSendBufferSize(channel.socket());
+      channel.configureBlocking(true);
+
+      int connectTime = getP2PConnectTimeout();
+
       try {
-        channel.socket().setTcpNoDelay(true);
 
-        channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
+        channel.socket().connect(addr, connectTime);
 
-        /*
-         * If conserve-sockets is false, the socket can be used for receiving responses, so set the
-         * receive buffer accordingly.
-         */
-        if (!sharedResource) {
-          setReceiveBufferSize(channel.socket(), this.owner.getConduit().tcpBufferSize);
+        if (TCPConduit.useSSL) {
+          SocketCreator socketCreator = SocketCreatorFactory
+              .getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER);
+          ioFilter = socketCreator.handshakeSSLSocketChannel(channel, 0, true,
+              Buffers.useDirectBuffers);
         } else {
-          setReceiveBufferSize(channel.socket(), SMALL_BUFFER_SIZE); // make small since only
-                                                                     // receive ack messages
+          ioFilter = new NioEngine();
         }
-        setSendBufferSize(channel.socket());
-        channel.configureBlocking(true);
-
-        int connectTime = getP2PConnectTimeout();
 
+      } catch (NullPointerException e) {
+        // bug #45044 - jdk 1.7 sometimes throws an NPE here
+        ConnectException c = new ConnectException("Encountered bug #45044 - retrying");
+        c.initCause(e);
+        // prevent a hot loop by sleeping a little bit
         try {
-          channel.socket().connect(addr, connectTime);
-        } catch (NullPointerException e) {
-          // bug #45044 - jdk 1.7 sometimes throws an NPE here
-          ConnectException c = new ConnectException("Encountered bug #45044 - retrying");
-          c.initCause(e);
-          // prevent a hot loop by sleeping a little bit
-          try {
-            Thread.sleep(1000);
-          } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-          }
-          throw c;
-        } catch (CancelledKeyException | ClosedSelectorException e) {
-          // bug #44469: for some reason NIO throws this runtime exception
-          // instead of an IOException on timeouts
-          ConnectException c = new ConnectException(
-              String.format("Attempt timed out after %s milliseconds",
-                  new Object[] {connectTime}));
-          c.initCause(e);
-          throw c;
+          Thread.sleep(1000);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
         }
-      } finally {
-        this.owner.removeConnectingSocket(channel.socket());
-      }
-      this.socket = channel.socket();
-    } else {
-      if (TCPConduit.useSSL) {
-        int socketBufferSize =
-            sharedResource ? SMALL_BUFFER_SIZE : this.owner.getConduit().tcpBufferSize;
-        this.socket = owner.getConduit().getSocketCreator().connectForServer(
-            remoteAddr.getInetAddress(), remoteAddr.getDirectChannelPort(), socketBufferSize);
-        // Set the receive buffer size local fields. It has already been set in the socket.
-        setSocketBufferSize(this.socket, false, socketBufferSize, true);
-        setSendBufferSize(this.socket);
-      } else {
-        Socket s = new Socket();
-        this.socket = s;
-        s.setTcpNoDelay(true);
-        s.setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
-        setReceiveBufferSize(s, SMALL_BUFFER_SIZE);
-        setSendBufferSize(s);
-        s.connect(addr, 0);
+        throw c;
+      } catch (SSLException e) {
+        ConnectException c = new ConnectException("Problem connecting to peer " + addr);
+        c.initCause(e);
+        throw c;
+      } catch (CancelledKeyException | ClosedSelectorException e) {
+        // bug #44469: for some reason NIO throws this runtime exception
+        // instead of an IOException on timeouts
+        ConnectException c = new ConnectException(
+            String.format("Attempt timed out after %s milliseconds",
+                connectTime));
+        c.initCause(e);
+        throw c;
       }
+    } finally {
+      this.owner.removeConnectingSocket(channel.socket());
     }
+    this.socket = channel.socket();
+
     if (logger.isDebugEnabled()) {
       logger.debug("Connection: connected to {} with IP address {}", remoteAddr, addr);
     }
     try {
       getSocket().setTcpNoDelay(true);
-    } catch (SocketException e) {
+    } catch (SocketException ignored) {
     }
   }
 
@@ -1293,20 +1203,15 @@ public class Connection implements Runnable {
    */
   private static final boolean BATCH_SENDS = Boolean.getBoolean("p2p.batchSends");
   private static final int BATCH_BUFFER_SIZE =
-      Integer.getInteger("p2p.batchBufferSize", 1024 * 1024).intValue();
-  private static final int BATCH_FLUSH_MS = Integer.getInteger("p2p.batchFlushTime", 50).intValue();
-  private Object batchLock;
+      Integer.getInteger("p2p.batchBufferSize", 1024 * 1024);
+  private static final int BATCH_FLUSH_MS = Integer.getInteger("p2p.batchFlushTime", 50);
+  private final Object batchLock = new Object();
   private ByteBuffer fillBatchBuffer;
   private ByteBuffer sendBatchBuffer;
   private BatchBufferFlusher batchFlusher;
 
   private void createBatchSendBuffer() {
-    // batch send buffer isn't needed if old-io is being used
-    if (!this.useNIO) {
-      return;
-    }
-    this.batchLock = new Object();
-    if (TCPConduit.useDirectBuffers) {
+    if (Buffers.useDirectBuffers) {
       this.fillBatchBuffer = ByteBuffer.allocateDirect(BATCH_BUFFER_SIZE);
       this.sendBatchBuffer = ByteBuffer.allocateDirect(BATCH_BUFFER_SIZE);
     } else {
@@ -1317,7 +1222,7 @@ public class Connection implements Runnable {
     this.batchFlusher.start();
   }
 
-  public void cleanUpOnIdleTaskCancel() {
+  void cleanUpOnIdleTaskCancel() {
     // Make sure receivers are removed from the connection table, this should always be a noop, but
     // is done here as a failsafe.
     if (isReceiver) {
@@ -1331,7 +1236,7 @@ public class Connection implements Runnable {
     private DMStats stats;
 
 
-    public BatchBufferFlusher() {
+    BatchBufferFlusher() {
       setDaemon(true);
       this.stats = owner.getConduit().getStats();
     }
@@ -1339,7 +1244,7 @@ public class Connection implements Runnable {
     /**
      * Called when a message writer needs the current fillBatchBuffer flushed
      */
-    public void flushBuffer(ByteBuffer bb) {
+    void flushBuffer(ByteBuffer bb) {
       final long start = DistributionStats.getStatTime();
       try {
         synchronized (this) {
@@ -1408,7 +1313,7 @@ public class Connection implements Runnable {
                 try {
                   sendBatchBuffer.flip();
                   SocketChannel channel = getSocket().getChannel();
-                  nioWriteFully(channel, sendBatchBuffer, false, null);
+                  writeFully(channel, sendBatchBuffer, false, null);
                   sendBatchBuffer.clear();
                 } catch (IOException | ConnectionException ex) {
                   logger.fatal("Exception flushing batch send buffer: %s", ex);
@@ -1556,7 +1461,7 @@ public class Connection implements Runnable {
         }
         // make sure our socket is closed
         asyncClose(false);
-        nioLengthSet = false;
+        lengthSet = false;
       } // synchronized
 
       // moved the call to notifyHandshakeWaiter out of the above
@@ -1672,11 +1577,7 @@ public class Connection implements Runnable {
     ConnectionTable.threadWantsSharedResources();
     makeReaderThread(this.isReceiver);
     try {
-      if (useNIO()) {
-        runNioReader();
-      } else {
-        runOioReader();
-      }
+      readMessages();
     } finally {
       // bug36060: do the socket close within a finally block
       if (logger.isDebugEnabled()) {
@@ -1690,9 +1591,9 @@ public class Connection implements Runnable {
         asyncClose(false);
         this.owner.removeAndCloseThreadOwnedSockets();
       }
-      ByteBuffer tmp = this.nioInputBuffer;
+      ByteBuffer tmp = this.inputBuffer;
       if (tmp != null) {
-        this.nioInputBuffer = null;
+        this.inputBuffer = null;
         final DMStats stats = this.owner.getConduit().getStats();
         Buffers.releaseReceiveBuffer(tmp, stats);
       }
@@ -1722,9 +1623,9 @@ public class Connection implements Runnable {
     return sb.toString();
   }
 
-  private void runNioReader() {
+  private void readMessages() {
     // take a snapshot of uniqueId to detect reconnect attempts; see bug 37592
-    SocketChannel channel = null;
+    SocketChannel channel;
     try {
       channel = getSocket().getChannel();
       channel.configureBlocking(true);
@@ -1733,7 +1634,7 @@ public class Connection implements Runnable {
       // is done.
       try {
         requestClose(
-            "runNioReader caught closed channel");
+            "readMessages caught closed channel");
       } catch (Exception ignore) {
       }
       return; // exit loop and thread
@@ -1741,7 +1642,7 @@ public class Connection implements Runnable {
       if (stopped || owner.getConduit().getCancelCriterion().isCancelInProgress()) {
         try {
           requestClose(
-              "runNioReader caught shutdown");
+              "readMessages caught shutdown");
         } catch (Exception ignore) {
         }
         return; // bug37520: exit loop (and thread)
@@ -1776,6 +1677,7 @@ public class Connection implements Runnable {
           Socket s = this.socket;
           if (s != null) {
             try {
+              ioFilter.close();
               s.close();
             } catch (IOException e) {
               // don't care
@@ -1788,7 +1690,7 @@ public class Connection implements Runnable {
         }
 
         try {
-          ByteBuffer buff = getNIOBuffer();
+          ByteBuffer buff = getInputBuffer();
           synchronized (stateLock) {
             connectionState = STATE_READING;
           }
@@ -1811,7 +1713,7 @@ public class Connection implements Runnable {
             return;
           }
 
-          processNIOBuffer();
+          processInputBuffer();
           if (!this.isReceiver && (this.handshakeRead || this.handshakeCancelled)) {
             if (logger.isDebugEnabled()) {
               if (this.handshakeRead) {
@@ -1832,7 +1734,7 @@ public class Connection implements Runnable {
           try {
             requestClose(
                 String.format("CacheClosed in channel read: %s", e));
-          } catch (Exception ex) {
+          } catch (Exception ignored) {
           }
           return;
         } catch (ClosedChannelException e) {
@@ -1840,7 +1742,7 @@ public class Connection implements Runnable {
           try {
             requestClose(String.format("ClosedChannelException in channel read: %s",
                 e));
-          } catch (Exception ex) {
+          } catch (Exception ignored) {
           }
           return;
         } catch (IOException e) {
@@ -1863,7 +1765,7 @@ public class Connection implements Runnable {
           try {
             requestClose(
                 String.format("IOException in channel read: %s", e));
-          } catch (Exception ex) {
+          } catch (Exception ignored) {
           }
           return;
 
@@ -1876,7 +1778,7 @@ public class Connection implements Runnable {
           try {
             requestClose(
                 String.format("%s exception in channel read", e));
-          } catch (Exception ex) {
+          } catch (Exception ignored) {
           }
           return;
         }
@@ -1888,7 +1790,7 @@ public class Connection implements Runnable {
         }
       }
       if (logger.isDebugEnabled()) {
-        logger.debug("{} runNioReader terminated id={} from {}", p2pReaderName(), conduitIdStr,
+        logger.debug("{} readMessages terminated id={} from {}", p2pReaderName(), conduitIdStr,
             remoteAddr);
       }
     }
@@ -1910,7 +1812,7 @@ public class Connection implements Runnable {
    * checks to see if an exception should not be logged: i.e., "forcibly closed", "reset by peer",
    * or "connection reset"
    */
-  public static boolean isIgnorableIOException(Exception e) {
+  private static boolean isIgnorableIOException(Exception e) {
     if (e instanceof ClosedChannelException) {
       return true;
     }
@@ -1994,465 +1896,9 @@ public class Connection implements Runnable {
     }
   }
 
-  private void runOioReader() {
-    InputStream input = null;
-    try {
-      if (logger.isDebugEnabled()) {
-        logger.debug("Socket is of type: {}", getSocket().getClass());
-      }
-      input = new BufferedInputStream(getSocket().getInputStream(), INITIAL_CAPACITY);
-    } catch (IOException io) {
-      if (stopped || owner.getConduit().getCancelCriterion().isCancelInProgress()) {
-        return; // bug 37520: exit run loop (and thread)
-      }
-      logger.fatal("Unable to get input stream", io);
-      stopped = true;
-    }
-
-    if (!stopped) {
-      Assert.assertTrue(owner != null,
-          "owner should not be null");
-      if (logger.isDebugEnabled()) {
-        logger.debug("Starting {}", p2pReaderName());
-      }
-    }
-
-    byte[] headerBytes = new byte[MSG_HEADER_BYTES];
-
-    final ByteArrayDataInput dis = new ByteArrayDataInput();
-    while (!stopped) {
-      try {
-        if (SystemFailure.getFailure() != null) {
-          // Allocate no objects here!
-          Socket s = this.socket;
-          if (s != null) {
-            try {
-              s.close();
-            } catch (IOException e) {
-              // don't care
-            }
-          }
-          SystemFailure.checkFailure(); // throws
-        }
-        if (this.owner.getConduit().getCancelCriterion().isCancelInProgress()) {
-          break;
-        }
-        int len = 0;
-        if (readFully(input, headerBytes, headerBytes.length) < 0) {
-          stopped = true;
-          continue;
-        }
-        // long recvNanos = DistributionStats.getStatTime();
-        len = ((headerBytes[MSG_HEADER_SIZE_OFFSET] & 0xff) * 0x1000000)
-            + ((headerBytes[MSG_HEADER_SIZE_OFFSET + 1] & 0xff) * 0x10000)
-            + ((headerBytes[MSG_HEADER_SIZE_OFFSET + 2] & 0xff) * 0x100)
-            + (headerBytes[MSG_HEADER_SIZE_OFFSET + 3] & 0xff);
-        /* byte msgHdrVersion = */ calcHdrVersion(len);
-        len = calcMsgByteSize(len);
-        int msgType = headerBytes[MSG_HEADER_TYPE_OFFSET];
-        short msgId = (short) (((headerBytes[MSG_HEADER_ID_OFFSET] & 0xff) << 8)
-            + (headerBytes[MSG_HEADER_ID_OFFSET + 1] & 0xff));
-        boolean myDirectAck = (msgType & DIRECT_ACK_BIT) != 0;
-        if (myDirectAck) {
-          msgType &= ~DIRECT_ACK_BIT; // clear the bit
-        }
-        // Following validation fixes bug 31145
-        if (!validMsgType(msgType)) {
-          logger.fatal("Unknown P2P message type: {}", Integer.valueOf(msgType));
-          this.readerShuttingDown = true;
-          requestClose(String.format("Unknown P2P message type: %s",
-              Integer.valueOf(msgType)));
-          break;
-        }
-        if (logger.isTraceEnabled())
-          logger.trace("{} reading {} bytes", conduitIdStr, len);
-        byte[] bytes = new byte[len];
-        if (readFully(input, bytes, len) < 0) {
-          stopped = true;
-          continue;
-        }
-        boolean interrupted = Thread.interrupted();
-        try {
-          if (this.handshakeRead) {
-            if (msgType == NORMAL_MSG_TYPE) {
-              // DMStats stats = this.owner.getConduit().stats;
-              // long start = DistributionStats.getStatTime();
-              this.owner.getConduit().getStats().incMessagesBeingReceived(true, len);
-              dis.initialize(bytes, this.remoteVersion);
-              DistributionMessage msg = null;
-              try {
-                ReplyProcessor21.initMessageRPId();
-                long startSer = this.owner.getConduit().getStats().startMsgDeserialization();
-                msg = (DistributionMessage) InternalDataSerializer.readDSFID(dis);
-                this.owner.getConduit().getStats().endMsgDeserialization(startSer);
-                if (dis.available() != 0) {
-                  logger.warn("Message deserialization of {} did not read {} bytes.",
-                      msg, Integer.valueOf(dis.available()));
-                }
-                // stats.incBatchCopyTime(start);
-                try {
-                  // start = DistributionStats.getStatTime();
-                  if (!dispatchMessage(msg, len, myDirectAck)) {
-                    continue;
-                  }
-                  // stats.incBatchSendTime(start);
-                } catch (MemberShunnedException e) {
-                  continue;
-                } catch (Exception de) {
-                  this.owner.getConduit().getCancelCriterion().checkCancelInProgress(de); // bug
-                                                                                          // 37101
-                  logger.fatal("Error dispatching message", de);
-                }
-              } catch (VirtualMachineError err) {
-                SystemFailure.initiateFailure(err);
-                // If this ever returns, rethrow the error. We're poisoned
-                // now, so don't let this thread continue.
-                throw err;
-              } catch (Throwable e) {
-                // Whenever you catch Error or Throwable, you must also
-                // catch VirtualMachineError (see above). However, there is
-                // _still_ a possibility that you are dealing with a cascading
-                // error condition, so you also need to check to see if the JVM
-                // is still usable:
-                SystemFailure.checkFailure();
-                // In particular I want OutOfMem to be caught here
-                if (!myDirectAck) {
-                  String reason =
-                      "Error deserializing message";
-                  sendFailureReply(ReplyProcessor21.getMessageRPId(), reason, e, myDirectAck);
-                }
-                if (e instanceof CancelException) {
-                  if (!(e instanceof CacheClosedException)) {
-                    // Just log a message if we had trouble deserializing due to
-                    // CacheClosedException; see bug 43543
-                    throw (CancelException) e;
-                  }
-                }
-                logger.fatal("Error deserializing message", e);
-                // requestClose();
-                // return;
-              } finally {
-                ReplyProcessor21.clearMessageRPId();
-              }
-            } else if (msgType == CHUNKED_MSG_TYPE) {
-              MsgDestreamer md = obtainMsgDestreamer(msgId, remoteVersion);
-              this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0, len);
-              try {
-                md.addChunk(bytes);
-              } catch (IOException ex) {
-                logger.fatal("Failed handling chunk message", ex);
-              }
-            } else /* (messageType == END_CHUNKED_MSG_TYPE) */ {
-              MsgDestreamer md = obtainMsgDestreamer(msgId, remoteVersion);
-              this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0, len);
-              try {
-                md.addChunk(bytes);
-              } catch (IOException ex) {
-                logger.fatal("Failed handling end chunk message", ex);
-              }
-              DistributionMessage msg = null;
-              int msgLength = 0;
-              String failureMsg = null;
-              Throwable failureEx = null;
-              int rpId = 0;
-              try {
-                msg = md.getMessage();
-              } catch (ClassNotFoundException ex) {
-                this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
-                failureEx = ex;
-                rpId = md.getRPid();
-                logger.warn("ClassNotFound deserializing message: {}", ex.toString());
-              } catch (IOException ex) {
-                this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
-                failureMsg = "IOException deserializing message";
-                failureEx = ex;
-                rpId = md.getRPid();
-                logger.fatal("IOException deserializing message", failureEx);
-              } catch (InterruptedException ex) {
-                Thread.currentThread().interrupt();
-                throw ex; // caught by outer try
-              } catch (VirtualMachineError err) {
-                SystemFailure.initiateFailure(err);
-                // If this ever returns, rethrow the error. We're poisoned
-                // now, so don't let this thread continue.
-                throw err;
-              } catch (Throwable ex) {
-                // Whenever you catch Error or Throwable, you must also
-                // catch VirtualMachineError (see above). However, there is
-                // _still_ a possibility that you are dealing with a cascading
-                // error condition, so you also need to check to see if the JVM
-                // is still usable:
-                SystemFailure.checkFailure();
-                this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
-                failureMsg = "Unexpected failure deserializing message";
-                failureEx = ex;
-                rpId = md.getRPid();
-                logger.fatal("Unexpected failure deserializing message",
-                    failureEx);
-              } finally {
-                msgLength = md.size();
-                releaseMsgDestreamer(msgId, md);
-              }
-              if (msg != null) {
-                try {
-                  if (!dispatchMessage(msg, msgLength, myDirectAck)) {
-                    continue;
-                  }
-                } catch (MemberShunnedException e) {
-                  continue;
-                } catch (Exception de) {
-                  this.owner.getConduit().getCancelCriterion().checkCancelInProgress(de);
-                  logger.fatal("Error dispatching message", de);
-                } catch (ThreadDeath td) {
-                  throw td;
-                } catch (VirtualMachineError err) {
-                  SystemFailure.initiateFailure(err);
-                  // If this ever returns, rethrow the error. We're poisoned
-                  // now, so don't let this thread continue.
-                  throw err;
-                } catch (Throwable t) {
-                  // Whenever you catch Error or Throwable, you must also
-                  // catch VirtualMachineError (see above). However, there is
-                  // _still_ a possibility that you are dealing with a cascading
-                  // error condition, so you also need to check to see if the JVM
-                  // is still usable:
-                  SystemFailure.checkFailure();
-                  logger.fatal("Throwable dispatching message", t);
-                }
-              } else if (failureEx != null) {
-                sendFailureReply(rpId, failureMsg, failureEx, myDirectAck);
-              }
-            }
-          } else {
-            dis.initialize(bytes, null);
-            if (!this.isReceiver) {
-              this.replyCode = dis.readUnsignedByte();
-              if (this.replyCode != REPLY_CODE_OK
-                  && this.replyCode != REPLY_CODE_OK_WITH_ASYNC_INFO) {
-                Integer replyCodeInteger = Integer.valueOf(this.replyCode);
-                String err = String.format("Unknown handshake reply code: %s",
-                    replyCodeInteger);
-
-                if (this.replyCode == 0) { // bug 37113
-                  if (logger.isDebugEnabled()) {
-                    logger.debug("{} (peer probably departed ungracefully)", err);
-                  }
-                } else {
-                  logger.fatal("Unknown handshake reply code: {}",
-                      replyCodeInteger);
-                }
-                this.readerShuttingDown = true;
-                requestClose(err);
-                break;
-              }
-              if (this.replyCode == REPLY_CODE_OK_WITH_ASYNC_INFO) {
-                this.asyncDistributionTimeout = dis.readInt();
-                this.asyncQueueTimeout = dis.readInt();
-                this.asyncMaxQueueSize = (long) dis.readInt() * (1024 * 1024);
-                if (this.asyncDistributionTimeout != 0) {
-                  logger.info("{} async configuration received {}.",
-                      p2pReaderName(),
-                      " asyncDistributionTimeout=" + this.asyncDistributionTimeout
-                          + " asyncQueueTimeout=" + this.asyncQueueTimeout
-                          + " asyncMaxQueueSize="
-                          + (this.asyncMaxQueueSize / (1024 * 1024)));
-                }
-                // read the product version ordinal for on-the-fly serialization
-                // transformations (for rolling upgrades)
-                this.remoteVersion = Version.readVersion(dis, true);
-              }
-              notifyHandshakeWaiter(true);
-            } else {
-              byte b = dis.readByte();
-              if (b != 0) {
-                throw new IllegalStateException(
-                    String.format(
-                        "Detected old version (pre 5.0.1) of GemFire or non-GemFire during handshake due to initial byte being %s",
-                        new Byte(b)));
-              }
-              byte handshakeByte = dis.readByte();
-              if (handshakeByte != HANDSHAKE_VERSION) {
-                throw new IllegalStateException(
-                    String.format(
-                        "Detected wrong version of GemFire product during handshake. Expected %s but found %s",
-
-                        new Object[] {new Byte(HANDSHAKE_VERSION), new Byte(handshakeByte)}));
-              }
-              InternalDistributedMember remote = DSFIDFactory.readInternalDistributedMember(dis);
-              setRemoteAddr(remote);
-              Thread.currentThread().setName(String.format("P2P message reader for %s on port %s",
-                  this.remoteAddr, this.socket.getPort()));
-              this.sharedResource = dis.readBoolean();
-              this.preserveOrder = dis.readBoolean();
-              this.uniqueId = dis.readLong();
-              // read the product version ordinal for on-the-fly serialization
-              // transformations (for rolling upgrades)
-              this.remoteVersion = Version.readVersion(dis, true);
-              int dominoNumber = 0;
-              if (this.remoteVersion == null
-                  || (this.remoteVersion.compareTo(Version.GFE_80) >= 0)) {
-                dominoNumber = dis.readInt();
-                if (this.sharedResource) {
-                  dominoNumber = 0;
-                }
-                dominoCount.set(dominoNumber);
-                // this.senderName = dis.readUTF();
-                setThreadName(dominoNumber);
-              }
-
-              if (!this.sharedResource) {
-                if (tipDomino()) {
-                  logger
-                      .info("thread owned receiver forcing itself to send on thread owned sockets");
-                  // bug #49565 - if domino count is >= 2 use shared resources.
-                  // Also see DistributedCacheOperation#supportsDirectAck
-                } else { // if (dominoNumber < 2){
-                  ConnectionTable.threadWantsOwnResources();
-                  if (logger.isDebugEnabled()) {
-                    logger.debug(
-                        "thread-owned receiver with domino count of {} will prefer sending on thread-owned sockets",
-                        dominoNumber);
-                  }
-                  // } else {
-                  // ConnectionTable.threadWantsSharedResources();
-                  // logger.fine("thread-owned receiver with domino count of " + dominoNumber + "
-                  // will prefer shared sockets");
-                }
-                this.conduit.getStats().incThreadOwnedReceivers(1L, dominoNumber);
-              }
-
-              if (logger.isDebugEnabled()) {
-                logger.debug("{} remoteAddr is {} {}", p2pReaderName(), this.remoteAddr,
-                    (this.remoteVersion != null ? " (" + this.remoteVersion + ')' : ""));
-              }
-
-              String authInit = System.getProperty(
-                  DistributionConfigImpl.SECURITY_SYSTEM_PREFIX + SECURITY_PEER_AUTH_INIT);
-              boolean isSecure = authInit != null && authInit.length() != 0;
-
-              if (isSecure) {
-                // ARB: wait till member authentication has been confirmed?
-                if (owner.getConduit().waitForMembershipCheck(this.remoteAddr)) {
-                  sendOKHandshakeReply(); // fix for bug 33224
-                  notifyHandshakeWaiter(true);
-                } else {
-                  // ARB: throw exception??
-                  notifyHandshakeWaiter(false);
-                  logger.warn("{} timed out during a membership check.",
-                      p2pReaderName());
-                }
-              } else {
-                sendOKHandshakeReply(); // fix for bug 33224
-                notifyHandshakeWaiter(true);
-              }
-            }
-            if (!this.isReceiver && (this.handshakeRead || this.handshakeCancelled)) {
-              if (logger.isDebugEnabled()) {
-                if (this.handshakeRead) {
-                  logger.debug("{} handshake has been read {}", p2pReaderName(), this);
-                } else {
-                  logger.debug("{} handshake has been cancelled {}", p2pReaderName(), this);
-                }
-              }
-              // Once we have read the handshake the reader can go away
-              break;
-            }
-            continue;
-          }
-        } catch (InterruptedException e) {
-          interrupted = true;
-          this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
-          logger.fatal(String.format("%s Stray interrupt reading message", p2pReaderName()), e);
-          continue;
-        } catch (Exception ioe) {
-          this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ioe); // bug 37101
-          if (!stopped) {
-            logger.fatal(String.format("%s Error reading message", p2pReaderName()), ioe);
-          }
-          continue;
-        } finally {
-          if (interrupted) {
-            Thread.currentThread().interrupt();
-          }
-        }
-      } catch (CancelException e) {
-        if (logger.isDebugEnabled()) {
-          String ccMsg = p2pReaderName() + " Cancelled: " + this;
-          if (e.getMessage() != null) {
-            ccMsg += ": " + e.getMessage();
-          }
-          logger.debug(ccMsg);
-        }
-        this.readerShuttingDown = true;
-        try {
-          requestClose(
-              String.format("CacheClosed in channel read: %s", e));
-        } catch (Exception ex) {
-        }
-        this.stopped = true;
-      } catch (IOException io) {
-        boolean closed = isSocketClosed() || "Socket closed".equalsIgnoreCase(io.getMessage()); // needed
-                                                                                                // for
-                                                                                                // Solaris
-                                                                                                // jdk
-                                                                                                // 1.4.2_08
-        if (!closed) {
-          if (logger.isDebugEnabled() && !isIgnorableIOException(io)) {
-            logger.debug("{} io exception for {}", p2pReaderName(), this, io);
-          }
-        }
-        this.readerShuttingDown = true;
-        try {
-          requestClose(String.format("IOException received: %s", io));
-        } catch (Exception ex) {
-        }
-
-        if (closed) {
-          stopped = true;
-        } else {
-          // sleep a bit to avoid a hot error loop
-          try {
-            Thread.sleep(1000);
-          } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-            if (this.owner.getConduit().getCancelCriterion().isCancelInProgress()) {
-              return;
-            }
-            break;
-          }
-        }
-      } // IOException
-      catch (Exception e) {
-        if (this.owner.getConduit().getCancelCriterion().isCancelInProgress()) {
-          return; // bug 37101
-        }
-        if (!stopped && !(e instanceof InterruptedException)) {
-          logger.fatal(String.format("%s exception received",
-              p2pReaderName()), e);
-        }
-        if (isSocketClosed()) {
-          stopped = true;
-        } else {
-          this.readerShuttingDown = true;
-          try {
-            requestClose(String.format("%s exception received", e));
-          } catch (Exception ex) {
-          }
-
-          // sleep a bit to avoid a hot error loop
-          try {
-            Thread.sleep(1000);
-          } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-            break;
-          }
-        }
-      }
-    }
-  }
 
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE")
-  int readFully(InputStream input, byte[] buffer, int len) throws IOException {
+  void readFully(InputStream input, byte[] buffer, int len) throws IOException {
     int bytesSoFar = 0;
     while (bytesSoFar < len) {
       this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
@@ -2465,9 +1911,9 @@ public class Connection implements Runnable {
           this.readerShuttingDown = true;
           try {
             requestClose("Stream read returned non-positive length");
-          } catch (Exception ex) {
+          } catch (Exception ignored) {
           }
-          return -1;
+          return;
         }
         bytesSoFar += bytesThisTime;
       } catch (InterruptedIOException io) {
@@ -2475,7 +1921,7 @@ public class Connection implements Runnable {
         this.readerShuttingDown = true;
         try {
           requestClose("Current thread interrupted");
-        } catch (Exception ex) {
+        } catch (Exception ignored) {
         }
         Thread.currentThread().interrupt();
         this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
@@ -2485,7 +1931,6 @@ public class Connection implements Runnable {
         }
       }
     } // while
-    return len;
   }
 
   /**
@@ -2494,7 +1939,7 @@ public class Connection implements Runnable {
    *
    * @throws ConnectionException if the conduit has stopped
    */
-  public void sendPreserialized(ByteBuffer buffer, boolean cacheContentChanges,
+  void sendPreserialized(ByteBuffer buffer, boolean cacheContentChanges,
       DistributionMessage msg) throws IOException, ConnectionException {
     if (!connected) {
       throw new ConnectionException(
@@ -2512,21 +1957,8 @@ public class Connection implements Runnable {
     }
     this.socketInUse = true;
     try {
-      if (useNIO()) {
-        SocketChannel channel = getSocket().getChannel();
-        nioWriteFully(channel, buffer, false, msg);
-      } else {
-        if (buffer.hasArray()) {
-          this.output.write(buffer.array(), buffer.arrayOffset(),
-              buffer.limit() - buffer.position());
-        } else {
-          byte[] bytesToWrite = getBytesToWrite(buffer);
-          synchronized (outLock) {
-            this.output.write(bytesToWrite);
-            this.output.flush();
-          }
-        }
-      }
+      SocketChannel channel = getSocket().getChannel();
+      writeFully(channel, buffer, false, msg);
       if (cacheContentChanges) {
         messagesSent++;
       }
@@ -2577,7 +2009,7 @@ public class Connection implements Runnable {
   /**
    * For testing we want to configure the connection without having to read a handshake
    */
-  protected void setSharedUnorderedForTest() {
+  void setSharedUnorderedForTest() {
     this.preserveOrder = false;
     this.sharedResource = true;
     this.handshakeRead = true;
@@ -2585,7 +2017,7 @@ public class Connection implements Runnable {
 
 
   /** ensure that a task is running to monitor transmission and reading of acks */
-  public synchronized void scheduleAckTimeouts() {
+  synchronized void scheduleAckTimeouts() {
     if (ackTimeoutTask == null) {
       final long msAW = this.owner.getDM().getConfig().getAckWaitThreshold() * 1000L;
       final long msSA = this.owner.getDM().getConfig().getAckSevereAlertThreshold() * 1000L;
@@ -2649,11 +2081,11 @@ public class Connection implements Runnable {
   }
 
   /** ack-wait-threshold and ack-severe-alert-threshold processing */
-  protected boolean doSevereAlertProcessing() {
+  private boolean doSevereAlertProcessing() {
     long now = System.currentTimeMillis();
     if (ackSATimeout > 0 && (transmissionStartTime + ackWaitTimeout + ackSATimeout) <= now) {
       logger.fatal("{} seconds have elapsed waiting for a response from {} for thread {}",
-          Long.valueOf((ackWaitTimeout + ackSATimeout) / 1000),
+          (ackWaitTimeout + ackSATimeout) / 1000L,
           getRemoteAddress(),
           ackThreadName);
       // turn off subsequent checks by setting the timeout to zero, then boot the member
@@ -2662,7 +2094,7 @@ public class Connection implements Runnable {
     } else if (!ackTimedOut && (0 < ackWaitTimeout)
         && (transmissionStartTime + ackWaitTimeout) <= now) {
       logger.warn("{} seconds have elapsed waiting for a response from {} for thread {}",
-          Long.valueOf(ackWaitTimeout / 1000), getRemoteAddress(), ackThreadName);
+          ackWaitTimeout / 1000L, getRemoteAddress(), ackThreadName);
       ackTimedOut = true;
 
       final String state = (connectionState == Connection.STATE_SENDING)
@@ -2676,12 +2108,6 @@ public class Connection implements Runnable {
     return false;
   }
 
-  private static byte[] getBytesToWrite(ByteBuffer buffer) {
-    byte[] bytesToWrite = new byte[buffer.limit()];
-    buffer.get(bytesToWrite);
-    return bytesToWrite;
-  }
-
   private boolean addToQueue(ByteBuffer buffer, DistributionMessage msg, boolean force)
       throws ConnectionException {
     final DMStats stats = this.owner.getConduit().getStats();
@@ -2806,20 +2232,20 @@ public class Connection implements Runnable {
     if (!addToQueue(buffer, msg, true)) {
       return false;
     } else {
-      startNioPusher();
+      startMessagePusher();
       return true;
     }
   }
 
-  private final Object nioPusherSync = new Object();
+  private final Object pusherSync = new Object();
 
-  private void startNioPusher() {
-    synchronized (this.nioPusherSync) {
+  private void startMessagePusher() {
+    synchronized (this.pusherSync) {
       while (this.pusherThread != null) {
         // wait for previous pusher thread to exit
         boolean interrupted = Thread.interrupted();
         try {
-          this.nioPusherSync.wait(); // spurious wakeup ok
+          this.pusherSync.wait(); // spurious wakeup ok
         } catch (InterruptedException ex) {
           interrupted = true;
           this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
@@ -2831,7 +2257,7 @@ public class Connection implements Runnable {
       }
       this.asyncQueuingInProgress = true;
       this.pusherThread =
-          new LoggingThread("P2P async pusher to " + this.remoteAddr, this::runNioPusher);
+          new LoggingThread("P2P async pusher to " + this.remoteAddr, this::runMessagePusher);
     } // synchronized
     this.pusherThread.start();
   }
@@ -2937,7 +2363,7 @@ public class Connection implements Runnable {
   /**
    * have the pusher thread check for queue overflow and for idle time exceeded
    */
-  protected void runNioPusher() {
+  private void runMessagePusher() {
     try {
       final DMStats stats = this.owner.getConduit().getStats();
       final long threadStart = stats.startAsyncThread();
@@ -2983,7 +2409,7 @@ public class Connection implements Runnable {
                 }
                 return;
               }
-              nioWriteFully(channel, bb, true, null);
+              writeFully(channel, bb, true, null);
               // We should not add messagesSent here according to Bruce.
               // The counts are increased elsewhere.
               // messagesSent++;
@@ -3017,7 +2443,7 @@ public class Connection implements Runnable {
         }
       } catch (CancelException ex) { // bug 37367
         final String err = String.format("P2P pusher %s caught CacheClosedException: %s",
-            new Object[] {this, ex});
+            this, ex);
         logger.debug(err);
         try {
           requestClose(err);
@@ -3040,14 +2466,14 @@ public class Connection implements Runnable {
         stats.incAsyncThreads(-1);
         stats.incAsyncQueues(-1);
         if (logger.isDebugEnabled()) {
-          logger.debug("runNioPusher terminated id={} from {}/{}", conduitIdStr, remoteAddr,
+          logger.debug("runMessagePusher terminated id={} from {}/{}", conduitIdStr, remoteAddr,
               remoteAddr);
         }
       }
     } finally {
-      synchronized (this.nioPusherSync) {
+      synchronized (this.pusherSync) {
         this.pusherThread = null;
-        this.nioPusherSync.notify();
+        this.pusherSync.notify();
       }
     }
   }
@@ -3121,6 +2547,7 @@ public class Connection implements Runnable {
         long queueTimeoutTarget = now + this.asyncQueueTimeout;
         channel.configureBlocking(false);
         try {
+          ByteBuffer wrappedBuffer = ioFilter.wrap(buffer);
           do {
             this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
             retries++;
@@ -3128,7 +2555,7 @@ public class Connection implements Runnable {
             if (FORCE_ASYNC_QUEUE) {
               amtWritten = 0;
             } else {
-              amtWritten = channel.write(buffer);
+              amtWritten = channel.write(wrappedBuffer);
             }
             if (amtWritten == 0) {
               now = System.currentTimeMillis();
@@ -3155,7 +2582,7 @@ public class Connection implements Runnable {
                     // the partial msg a candidate for conflation.
                     msg = null;
                   }
-                  if (handleBlockedWrite(buffer, msg)) {
+                  if (handleBlockedWrite(wrappedBuffer, msg)) {
                     return;
                   }
                 }
@@ -3166,8 +2593,8 @@ public class Connection implements Runnable {
                 if (curQueuedBytes > this.asyncMaxQueueSize) {
                   logger.warn(
                       "Queued bytes {} exceeds max of {}, asking slow receiver {} to disconnect.",
-                      Long.valueOf(curQueuedBytes),
-                      Long.valueOf(this.asyncMaxQueueSize), this.remoteAddr);
+                      curQueuedBytes,
+                      this.asyncMaxQueueSize, this.remoteAddr);
                   stats.incAsyncQueueSizeExceeded(1);
                   disconnectNeeded = true;
                 }
@@ -3178,8 +2605,8 @@ public class Connection implements Runnable {
                   blockedMs += this.asyncQueueTimeout;
                   logger.warn(
                       "Blocked for {}ms which is longer than the max of {}ms, asking slow receiver {} to disconnect.",
-                      Long.valueOf(blockedMs),
-                      Integer.valueOf(this.asyncQueueTimeout), this.remoteAddr);
+                      blockedMs,
+                      this.asyncQueueTimeout, this.remoteAddr);
                   stats.incAsyncQueueTimeouts(1);
                   disconnectNeeded = true;
                 }
@@ -3229,7 +2656,7 @@ public class Connection implements Runnable {
               queueTimeoutTarget = System.currentTimeMillis() + this.asyncQueueTimeout;
               waitTime = 1;
             }
-          } while (buffer.remaining() > 0);
+          } while (wrappedBuffer.remaining() > 0);
         } finally {
           channel.configureBlocking(true);
         }
@@ -3245,12 +2672,12 @@ public class Connection implements Runnable {
   }
 
   /**
-   * nioWriteFully implements a blocking write on a channel that is in non-blocking mode.
+   * writeFully implements a blocking write on a channel that is in non-blocking mode.
    *
    * @param forceAsync true if we need to force a blocking async write.
    * @throws ConnectionException if the conduit has stopped
    */
-  protected void nioWriteFully(SocketChannel channel, ByteBuffer buffer, boolean forceAsync,
+  void writeFully(SocketChannel channel, ByteBuffer buffer, boolean forceAsync,
       DistributionMessage msg) throws IOException, ConnectionException {
     final DMStats stats = this.owner.getConduit().getStats();
     if (!this.sharedResource) {
@@ -3272,17 +2699,18 @@ public class Connection implements Runnable {
           }
           // fall through
         }
+        ByteBuffer wrappedBuffer = ioFilter.wrap(buffer);
         do {
           int amtWritten = 0;
           long start = stats.startSocketWrite(true);
           try {
             // this.writerThread = Thread.currentThread();
-            amtWritten = channel.write(buffer);
+            amtWritten = channel.write(wrappedBuffer);
           } finally {
             stats.endSocketWrite(true, start, amtWritten, 0);
             // this.writerThread = null;
           }
-        } while (buffer.remaining() > 0);
+        } while (wrappedBuffer.remaining() > 0);
       } // synchronized
     } else {
       writeAsync(channel, buffer, forceAsync, msg, stats);
@@ -3290,16 +2718,16 @@ public class Connection implements Runnable {
   }
 
   /** gets the buffer for receiving message length bytes */
-  protected ByteBuffer getNIOBuffer() {
+  private ByteBuffer getInputBuffer() {
     final DMStats stats = this.owner.getConduit().getStats();
-    if (nioInputBuffer == null) {
+    if (inputBuffer == null) {
       int allocSize = this.recvBufferSize;
       if (allocSize == -1) {
         allocSize = this.owner.getConduit().tcpBufferSize;
       }
-      nioInputBuffer = Buffers.acquireReceiveBuffer(allocSize, stats);
+      inputBuffer = Buffers.acquireReceiveBuffer(allocSize, stats);
     }
-    return nioInputBuffer;
+    return inputBuffer;
   }
 
   /**
@@ -3312,30 +2740,28 @@ public class Connection implements Runnable {
 
   /* ~~~~~~~~~~~~~ connection states ~~~~~~~~~~~~~~~ */
   /** the connection is idle, but may be in use */
-  protected static final byte STATE_IDLE = 0;
+  private static final byte STATE_IDLE = 0;
   /** the connection is in use and is transmitting data */
-  protected static final byte STATE_SENDING = 1;
+  private static final byte STATE_SENDING = 1;
   /** the connection is in use and is done transmitting */
-  protected static final byte STATE_POST_SENDING = 2;
+  private static final byte STATE_POST_SENDING = 2;
   /** the connection is in use and is reading a direct-ack */
-  protected static final byte STATE_READING_ACK = 3;
+  private static final byte STATE_READING_ACK = 3;
   /** the connection is in use and has finished reading a direct-ack */
-  protected static final byte STATE_RECEIVED_ACK = 4;
+  private static final byte STATE_RECEIVED_ACK = 4;
   /** the connection is in use and is reading a message */
-  protected static final byte STATE_READING = 5;
+  private static final byte STATE_READING = 5;
   /* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */
 
   /** set to true if we exceeded the ack-wait-threshold waiting for a response */
-  protected volatile boolean ackTimedOut;
+  private volatile boolean ackTimedOut;
 
   /**
-   * @param msToWait number of milliseconds to wait for an ack. If 0 then wait forever.
-   * @param msInterval interval between checks
-   * @throws SocketTimeoutException if msToWait expires.
-   * @throws ConnectionException if ack is not received (fixes bug 34312)
+   * @throws SocketTimeoutException if wait expires.
+   * @throws ConnectionException if ack is not received
    */
-  public void readAck(final int msToWait, final long msInterval,
-      final DirectReplyProcessor processor) throws SocketTimeoutException, ConnectionException {
+  public void readAck(final DirectReplyProcessor processor)
+      throws SocketTimeoutException, ConnectionException {
     if (isSocketClosed()) {
       throw new ConnectionException(
           "connection is closed");
@@ -3350,11 +2776,7 @@ public class Connection implements Runnable {
     DMStats stats = owner.getConduit().getStats();
     final Version version = getRemoteVersion();
     try {
-      if (useNIO()) {
-        msgReader = new NIOMsgReader(this, version);
-      } else {
-        msgReader = new OioMsgReader(this, version);
-      }
+      msgReader = new NIOMsgReader(this, version);
 
       Header header = msgReader.readHeader();
 
@@ -3442,93 +2864,83 @@ public class Connection implements Runnable {
    * processes the current NIO buffer. If there are complete messages in the buffer, they are
    * deserialized and passed to TCPConduit for further processing
    */
-  private void processNIOBuffer() throws ConnectionException, IOException {
-    if (nioInputBuffer != null) {
-      nioInputBuffer.flip();
-    }
+  private void processInputBuffer() throws ConnectionException, IOException {
+    inputBuffer.flip();
+    ByteBuffer peerDataBuffer = ioFilter.unwrap(inputBuffer);
+
     boolean done = false;
 
     while (!done && connected) {
       this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
       // long startTime = DistributionStats.getStatTime();
-      int remaining = nioInputBuffer.remaining();
-      if (nioLengthSet || remaining >= MSG_HEADER_BYTES) {
-        if (!nioLengthSet) {
-          int headerStartPos = nioInputBuffer.position();
-          nioMessageLength = nioInputBuffer.getInt();
-          /* nioMessageVersion = */ calcHdrVersion(nioMessageLength);
-          nioMessageLength = calcMsgByteSize(nioMessageLength);
-          nioMessageType = nioInputBuffer.get();
-          nioMsgId = nioInputBuffer.getShort();
-          directAck = (nioMessageType & DIRECT_ACK_BIT) != 0;
-          if (directAck) {
-            nioMessageType &= ~DIRECT_ACK_BIT; // clear the ack bit
-          }
-          // Following validation fixes bug 31145
-          if (!validMsgType(nioMessageType)) {
-            Integer nioMessageTypeInteger = Integer.valueOf(nioMessageType);
-            logger.fatal("Unknown P2P message type: {}", nioMessageTypeInteger);
-            this.readerShuttingDown = true;
-            requestClose(String.format("Unknown P2P message type: %s",
-                nioMessageTypeInteger));
-            break;
-          }
-          nioLengthSet = true;
-          // keep the header "in" the buffer until we have read the entire msg.
-          // Trust me: this will reduce copying on large messages.
-          nioInputBuffer.position(headerStartPos);
-        }
-        if (remaining >= nioMessageLength + MSG_HEADER_BYTES) {
-          nioLengthSet = false;
-          nioInputBuffer.position(nioInputBuffer.position() + MSG_HEADER_BYTES);
-          // don't trust the message deserialization to leave the position in
-          // the correct spot. Some of the serialization uses buffered
-          // streams that can leave the position at the wrong spot
-          int startPos = nioInputBuffer.position();
-          int oldLimit = nioInputBuffer.limit();
-          nioInputBuffer.limit(startPos + nioMessageLength);
-          if (this.handshakeRead) {
-            if (nioMessageType == NORMAL_MSG_TYPE) {
-              this.owner.getConduit().getStats().incMessagesBeingReceived(true, nioMessageLength);
-              ByteBufferInputStream bbis =
-                  remoteVersion == null ? new ByteBufferInputStream(nioInputBuffer)
-                      : new VersionedByteBufferInputStream(nioInputBuffer, remoteVersion);
-              DistributionMessage msg = null;
+      int remaining = peerDataBuffer.remaining();
+      if (!(lengthSet || remaining >= MSG_HEADER_BYTES)) {
+        done = true;
+        ioFilter.doneReading(peerDataBuffer);
+        break;
+      }
+      if (!lengthSet) {
+        int headerStartPos = peerDataBuffer.position();
+        messageLength = peerDataBuffer.getInt();
+        /* nioMessageVersion = */ calcHdrVersion(messageLength);
+        messageLength = calcMsgByteSize(messageLength);
+        messageType = peerDataBuffer.get();
+        messageId = peerDataBuffer.getShort();
+        directAck = (messageType & DIRECT_ACK_BIT) != 0;
+        if (directAck) {
+          messageType &= ~DIRECT_ACK_BIT; // clear the ack bit
+        }
+        // Following validation fixes bug 31145
+        if (!validMsgType(messageType)) {
+          Integer nioMessageTypeInteger = (int) messageType;
+          logger.fatal("Unknown P2P message type: {}", nioMessageTypeInteger);
+          this.readerShuttingDown = true;
+          requestClose(String.format("Unknown P2P message type: %s",
+              nioMessageTypeInteger));
+          break;
+        }
+        lengthSet = true;
+        // keep the header "in" the buffer until we have read the entire msg.
+        // Trust me: this will reduce copying on large messages.
+        peerDataBuffer.position(headerStartPos);
+      }
+      if (remaining >= messageLength + MSG_HEADER_BYTES) {
+        lengthSet = false;
+        peerDataBuffer.position(peerDataBuffer.position() + MSG_HEADER_BYTES);
+        // don't trust the message deserialization to leave the position in
+        // the correct spot. Some of the serialization uses buffered
+        // streams that can leave the position at the wrong spot
+        int startPos = peerDataBuffer.position();
+        int oldLimit = peerDataBuffer.limit();
+        peerDataBuffer.limit(startPos + messageLength);
+        if (this.handshakeRead) {
+          if (messageType == NORMAL_MSG_TYPE) {
+            this.owner.getConduit().getStats().incMessagesBeingReceived(true, messageLength);
+            ByteBufferInputStream bbis =
+                remoteVersion == null ? new ByteBufferInputStream(peerDataBuffer)
+                    : new VersionedByteBufferInputStream(peerDataBuffer, remoteVersion);
+            DistributionMessage msg;
+            try {
+              ReplyProcessor21.initMessageRPId();
+              // add serialization stats
+              long startSer = this.owner.getConduit().getStats().startMsgDeserialization();
+              msg = (DistributionMessage) InternalDataSerializer.readDSFID(bbis);
+              this.owner.getConduit().getStats().endMsgDeserialization(startSer);
+              if (bbis.available() != 0) {
+                logger.warn("Message deserialization of {} did not read {} bytes.",
+                    msg, bbis.available());
+              }
               try {
-                ReplyProcessor21.initMessageRPId();
-                // add serialization stats
-                long startSer = this.owner.getConduit().getStats().startMsgDeserialization();
-                msg = (DistributionMessage) InternalDataSerializer.readDSFID(bbis);
-                this.owner.getConduit().getStats().endMsgDeserialization(startSer);
-                if (bbis.available() != 0) {
-                  logger.warn("Message deserialization of {} did not read {} bytes.",
-                      msg, Integer.valueOf(bbis.available()));
-                }
-                try {
-                  if (!dispatchMessage(msg, nioMessageLength, directAck)) {
-                    directAck = false;
-                  }
-                } catch (MemberShunnedException e) {
-                  directAck = false; // don't respond (bug39117)
-                } catch (Exception de) {
-                  this.owner.getConduit().getCancelCriterion().checkCancelInProgress(de);
-                  logger.fatal("Error dispatching message", de);
-                } catch (ThreadDeath td) {
-                  throw td;
-                } catch (VirtualMachineError err) {
-                  SystemFailure.initiateFailure(err);
-                  // If this ever returns, rethrow the error. We're poisoned
-                  // now, so don't let this thread continue.
-                  throw err;
-                } catch (Throwable t) {
-                  // Whenever you catch Error or Throwable, you must also
-                  // catch VirtualMachineError (see above). However, there is
-                  // _still_ a possibility that you are dealing with a cascading
-                  // error condition, so you also need to check to see if the JVM
-                  // is still usable:
-                  SystemFailure.checkFailure();
-                  logger.fatal("Throwable dispatching message", t);
+                if (!dispatchMessage(msg, messageLength, directAck)) {
+                  directAck = false;
                 }
+              } catch (MemberShunnedException e) {
+                directAck = false; // don't respond (bug39117)
+              } catch (Exception de) {
+                this.owner.getConduit().getCancelCriterion().checkCancelInProgress(de);
+                logger.fatal("Error dispatching message", de);
+              } catch (ThreadDeath td) {
+                throw td;
               } catch (VirtualMachineError err) {
                 SystemFailure.initiateFailure(err);
                 // If this ever returns, rethrow the error. We're poisoned
@@ -3541,151 +2953,116 @@ public class Connection implements Runnable {
                 // error condition, so you also need to check to see if the JVM
                 // is still usable:
                 SystemFailure.checkFailure();
-                sendFailureReply(ReplyProcessor21.getMessageRPId(),
-                    "Error deserializing message", t,
-                    directAck);
-                if (t instanceof ThreadDeath) {
-                  throw (ThreadDeath) t;
-                }
-                if (t instanceof CancelException) {
-                  if (!(t instanceof CacheClosedException)) {
-                    // Just log a message if we had trouble deserializing due to
-                    // CacheClosedException; see bug 43543
-                    throw (CancelException) t;
-                  }
-                }
-                logger.fatal("Error deserializing message", t);
-              } finally {
-                ReplyProcessor21.clearMessageRPId();
+                logger.fatal("Throwable dispatching message", t);
               }
-            } else if (nioMessageType == CHUNKED_MSG_TYPE) {
-              MsgDestreamer md = obtainMsgDestreamer(nioMsgId, remoteVersion);
-              this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0,
-                  nioMessageLength);
-              try {
-                md.addChunk(nioInputBuffer, nioMessageLength);
-              } catch (IOException ex) {
-                logger.fatal("Failed handling chunk message", ex);
-              }
-            } else /* (nioMessageType == END_CHUNKED_MSG_TYPE) */ {
-              // logger.info("END_CHUNK msgId="+nioMsgId);
-              MsgDestreamer md = obtainMsgDestreamer(nioMsgId, remoteVersion);
-              this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0,
-                  nioMessageLength);
-              try {
-                md.addChunk(nioInputBuffer, nioMessageLength);
-              } catch (IOException ex) {
-                logger.fatal("Failed handling end chunk message", ex);
+            } catch (VirtualMachineError err) {
+              SystemFailure.initiateFailure(err);
+              // If this ever returns, rethrow the error. We're poisoned
+              // now, so don't let this thread continue.
+              throw err;
+            } catch (Throwable t) {
+              // Whenever you catch Error or Throwable, you must also
+              // catch VirtualMachineError (see above). However, there is
+              // _still_ a possibility that you are dealing with a cascading
+              // error condition, so you also need to check to see if the JVM
+              // is still usable:
+              SystemFailure.checkFailure();
+              sendFailureReply(ReplyProcessor21.getMessageRPId(),
+                  "Error deserializing message", t,
+                  directAck);
+              if (t instanceof ThreadDeath) {
+                throw (ThreadDeath) t;
               }
-              DistributionMessage msg = null;
-              int msgLength = 0;
-              String failureMsg = null;
-              Throwable failureEx = null;
-              int rpId = 0;
-              boolean interrupted = false;
-              try {
-                msg = md.getMessage();
-              } catch (ClassNotFoundException ex) {
-                this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
-                failureMsg = "ClassNotFound deserializing message";
-                failureEx = ex;
-                rpId = md.getRPid();
-                logger.fatal("ClassNotFound deserializing message: {}", ex.toString());
-              } catch (IOException ex) {
-                this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
-                failureMsg = "IOException deserializing message";
-                failureEx = ex;
-                rpId = md.getRPid();
-                logger.fatal("IOException deserializing message", failureEx);
-              } catch (InterruptedException ex) {
-                interrupted = true;
-                this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
-              } catch (VirtualMachineError err) {
-                SystemFailure.initiateFailure(err);
-                // If this ever returns, rethrow the error. We're poisoned
-                // now, so don't let this thread continue.
-                throw err;
-              } catch (Throwable ex) {
-                // Whenever you catch Error or Throwable, you must also
-                // catch VirtualMachineError (see above). However, there is
-                // _still_ a possibility that you are dealing with a cascading
-                // error condition, so you also need to check to see if the JVM
-                // is still usable:
-                SystemFailure.checkFailure();
-                this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
-                this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
-                failureMsg = "Unexpected failure deserializing message";
-                failureEx = ex;
-                rpId = md.getRPid();
-                logger.fatal("Unexpected failure deserializing message",
-                    failureEx);
-              } finally {
-                msgLength = md.size();
-                releaseMsgDestreamer(nioMsgId, md);
-                if (interrupted) {
-                  Thread.currentThread().interrupt();
+              if (t instanceof CancelException) {
+                if (!(t instanceof CacheClosedException)) {
+                  // Just log a message if we had trouble deserializing due to
+                  // CacheClosedException; see bug 43543
+                  throw (CancelException) t;
                 }
               }
-              if (msg != null) {
-                try {
-                  if (!dispatchMessage(msg, msgLength, directAck)) {
-                    directAck = false;
-                  }
-                } catch (MemberShunnedException e) {
-                  // not a member anymore - don't reply
-                  directAck = false;
-                } catch (Exception de) {
-                  this.owner.getConduit().getCancelCriterion().checkCancelInProgress(de);
-                  logger.fatal("Error dispatching message", de);
-                } catch (ThreadDeath td) {
-                  throw td;
-                } catch (VirtualMachineError err) {
-                  SystemFailure.initiateFailure(err);
-                  // If this ever returns, rethrow the error. We're poisoned
-                  // now, so don't let this thread continue.
-                  throw err;
-                } catch (Throwable t) {
-                  // Whenever you catch Error or Throwable, you must also
-                  // catch VirtualMachineError (see above). However, there is
-                  // _still_ a possibility that you are dealing with a cascading
-                  // error condition, so you also need to check to see if the JVM
-                  // is still usable:
-                  SystemFailure.checkFailure();
-                  logger.fatal("Throwable dispatching message", t);
-                }
-              } else if (failureEx != null) {
-                sendFailureReply(rpId, failureMsg, failureEx, directAck);
+              logger.fatal("Error deserializing message", t);
+            } finally {
+              ReplyProcessor21.clearMessageRPId();
+            }
+          } else if (messageType == CHUNKED_MSG_TYPE) {
+            MsgDestreamer md = obtainMsgDestreamer(messageId, remoteVersion);
+            this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0,
+                messageLength);
+            try {
+              md.addChunk(peerDataBuffer, messageLength);
+            } catch (IOException ex) {
+              logger.fatal("Failed handling chunk message", ex);
+            }
+          } else /* (nioMessageType == END_CHUNKED_MSG_TYPE) */ {
+            // logger.info("END_CHUNK msgId="+nioMsgId);
+            MsgDestreamer md = obtainMsgDestreamer(messageId, remoteVersion);
+            this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0,
+                messageLength);
+            try {
+              md.addChunk(peerDataBuffer, messageLength);
+            } catch (IOException ex) {
+              logger.fatal("Failed handling end chunk message", ex);
+            }
+            DistributionMessage msg = null;
+            int msgLength;
+            String failureMsg = null;
+            Throwable failureEx = null;
+            int rpId = 0;
+            boolean interrupted = false;
+            try {
+              msg = md.getMessage();
+            } catch (ClassNotFoundException ex) {
+              this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
+              failureMsg = "ClassNotFound deserializing message";
+              failureEx = ex;
+              rpId = md.getRPid();
+              logger.fatal("ClassNotFound deserializing message: {}", ex.toString());
+            } catch (IOException ex) {
+              this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
+              failureMsg = "IOException deserializing message";
+              failureEx = ex;
+              rpId = md.getRPid();
+              logger.fatal("IOException deserializing message", failureEx);
+            } catch (InterruptedException ex) {
+              interrupted = true;
+              this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
+            } catch (VirtualMachineError err) {
+              SystemFailure.initiateFailure(err);
+              // If this ever returns, rethrow the error. We're poisoned
+              // now, so don't let this thread continue.
+              throw err;
+            } catch (Throwable ex) {
+              // Whenever you catch Error or Throwable, you must also
+              // catch VirtualMachineError (see above). However, there is
+              // _still_ a possibility that you are dealing with a cascading
+              // error condition, so you also need to check to see if the JVM
+              // is still usable:
+              SystemFailure.checkFailure();
+              this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
+              this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
+              failureMsg = "Unexpected failure deserializing message";
+              failureEx = ex;
+              rpId = md.getRPid();
+              logger.fatal("Unexpected failure deserializing message",
+                  failureEx);
+            } finally {
+              msgLength = md.size();
+              releaseMsgDestreamer(messageId, md);
+              if (interrupted) {
+                Thread.currentThread().interrupt();
               }
             }
-          } else {
-            // read HANDSHAKE
-            ByteBufferInputStream bbis = new ByteBufferInputStream(nioInputBuffer);
-            DataInputStream dis = new DataInputStream(bbis);
-            if (!this.isReceiver) {
+            if (msg != null) {
               try {
-                this.replyCode = dis.readUnsignedByte();
-                if (this.replyCode == REPLY_CODE_OK_WITH_ASYNC_INFO) {
-                  this.asyncDistributionTimeout = dis.readInt();
-                  this.asyncQueueTimeout = dis.readInt();
-                  this.asyncMaxQueueSize = (long) dis.readInt() * (1024 * 1024);
-                  if (this.asyncDistributionTimeout != 0) {
-                    logger.info("{} async configuration received {}.",
-                        p2pReaderName(),
-                        " asyncDistributionTimeout=" + this.asyncDistributionTimeout
-                            + " asyncQueueTimeout=" + this.asyncQueueTimeout
-                            + " asyncMaxQueueSize="
-                            + (this.asyncMaxQueueSize / (1024 * 1024)));
-                  }
-                  // read the product version ordinal for on-the-fly serialization
-                  // transformations (for rolling upgrades)
-                  this.remoteVersion = Version.readVersion(dis, true);
+                if (!dispatchMessage(msg, msgLength, directAck)) {
+                  directAck = false;
                 }
-              } catch (Exception e) {
-                this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
-                logger.fatal("Error deserializing P2P handshake reply", e);
-                this.readerShuttingDown = true;
-                requestClose("Error deserializing P2P handshake reply");
-                return;
+              } catch (MemberShunnedException e) {
+                // not a member anymore - don't reply
+                directAck = false;
+              } catch (Exception de) {
+                this.owner.getConduit().getCancelCriterion().checkCancelInProgress(de);
+                logger.fatal("Error dispatching message", de);
               } catch (ThreadDeath td) {
                 throw td;
               } catch (VirtualMachineError err) {
@@ -3700,154 +3077,198 @@ public class Connection implements Runnable {
                 // error condition, so you also need to check to see if the JVM
                 // is still usable:
                 SystemFailure.checkFailure();
-                logger.fatal("Throwable deserializing P2P handshake reply",
-                    t);
-                this.readerShuttingDown = true;
-                requestClose("Throwable deserializing P2P handshake reply");
-                return;
+                logger.fatal("Throwable dispatching message", t);
               }
-              if (this.replyCode != REPLY_CODE_OK
-                  && this.replyCode != REPLY_CODE_OK_WITH_ASYNC_INFO) {
-                String err =
-                    "Unknown handshake reply code: %s nioMessageLength: %s";
-                Object[] errArgs = new Object[] {Integer.valueOf(this.replyCode),
-                    Integer.valueOf(nioMessageLength)};
-                if (replyCode == 0 && logger.isDebugEnabled()) { // bug 37113
-                  logger.debug(
-                      String.format(err, errArgs) + " (peer probably departed ungracefully)");
-                } else {
-                  logger.fatal(err, errArgs);
-                }
-                this.readerShuttingDown = true;
-                requestClose(String.format(err, errArgs));
-                return;
-              }
-              notifyHandshakeWaiter(true);
-            } else {
-              try {
-                byte b = dis.readByte();
-                if (b != 0) {
-                  throw new IllegalStateException(
-                      String.format(
-                          "Detected old version (pre 5.0.1) of GemFire or non-GemFire during handshake due to initial byte being %s",
-                          new Byte(b)));
-                }
-                byte handshakeByte = dis.readByte();
-                if (handshakeByte != HANDSHAKE_VERSION) {
-                  throw new IllegalStateException(
-                      String.format(
-                          "Detected wrong version of GemFire product during handshake. Expected %s but found %s",
-
-                          new Object[] {new Byte(HANDSHAKE_VERSION), new Byte(handshakeByte)}));
+            } else if (failureEx != null) {
+              sendFailureReply(rpId, failureMsg, failureEx, directAck);
+            }
+          }
+        } else {
+          // read HANDSHAKE ---------------------------------------------
+          ByteBufferInputStream bbis = new ByteBufferInputStream(peerDataBuffer);
+          DataInputStream dis = new DataInputStream(bbis);
+          if (!this.isReceiver) {
+            try {
+              this.replyCode = dis.readUnsignedByte();
+              if (this.replyCode == REPLY_CODE_OK_WITH_ASYNC_INFO) {
+                this.asyncDistributionTimeout = dis.readInt();
+                this.asyncQueueTimeout = dis.readInt();
+                this.asyncMaxQueueSize = (long) dis.readInt() * (1024 * 1024);
+                if (this.asyncDistributionTimeout != 0) {
+                  logger.info("{} async configuration received {}.",
+                      p2pReaderName(),
+                      " asyncDistributionTimeout=" + this.asyncDistributionTimeout
+                          + " asyncQueueTimeout=" + this.asyncQueueTimeout
+                          + " asyncMaxQueueSize="
+                          + (this.asyncMaxQueueSize / (1024 * 1024)));
                 }
-                InternalDistributedMember remote = DSFIDFactory.readInternalDistributedMember(dis);
-                setRemoteAddr(remote);
-                this.sharedResource = dis.readBoolean();
-                this.preserveOrder = dis.readBoolean();
-                this.uniqueId = dis.readLong();
                 // read the product version ordinal for on-the-fly serialization
                 // transformations (for rolling upgrades)
                 this.remoteVersion = Version.readVersion(dis, true);
-                int dominoNumber = 0;
-                if (this.remoteVersion == null
-                    || (this.remoteVersion.compareTo(Version.GFE_80) >= 0)) {
-                  dominoNumber = dis.readInt();
-                  if (this.sharedResource) {
-                    dominoNumber = 0;
-                  }
-                  dominoCount.set(dominoNumber);
-                  // this.senderName = dis.readUTF();
+              }
+            } catch (Exception e) {
+              this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
+              logger.fatal("Error deserializing P2P handshake reply", e);
+              this.readerShuttingDown = true;
+              requestClose("Error deserializing P2P handshake reply");
+              return;
+            } catch (ThreadDeath td) {
+              throw td;
+            } catch (VirtualMachineError err) {
+              SystemFailure.initiateFailure(err);
+              // If this ever returns, rethrow the error. We're poisoned
+              // now, so don't let this thread continue.
+              throw err;
+            } catch (Throwable t) {
+              // Whenever you catch Error or Throwable, you must also
+              // catch VirtualMachineError (see above). However, there is
+              // _still_ a possibility that you are dealing with a cascading
+              // error condition, so you also need to check to see if the JVM
+              // is still usable:
+              SystemFailure.checkFailure();
+              logger.fatal("Throwable deserializing P2P handshake reply",
+                  t);
+              this.readerShuttingDown = true;
+              requestClose("Throwable deserializing P2P handshake reply");
+              return;
+            }
+            if (this.replyCode != REPLY_CODE_OK
+                && this.replyCode != REPLY_CODE_OK_WITH_ASYNC_INFO) {
+              String err =
+                  "Unknown handshake reply code: %s nioMessageLength: %s";
+              Object[] errArgs = new Object[] {this.replyCode,
+                  messageLength};
+              if (replyCode == 0 && logger.isDebugEnabled()) { // bug 37113
+                logger.debug(
+                    String.format(err, errArgs) + " (peer probably departed ungracefully)");
+              } else {
+                logger.fatal(err, errArgs);
+              }
+              this.readerShuttingDown = true;
+              requestClose(String.format(err, errArgs));
+              return;
+            }
+            notifyHandshakeWaiter(true);
+          } else {
+            try {
+              byte b = dis.readByte();
+              if (b != 0) {
+                throw new IllegalStateException(
+                    String.format(
+                        "Detected old version (pre 5.0.1) of GemFire or non-GemFire during handshake due to initial byte being %s",
+                        b));
+              }
+              byte handshakeByte = dis.readByte();
+              if (handshakeByte != HANDSHAKE_VERSION) {
+                throw new IllegalStateException(
+                    String.format(
+                        "Detected wrong version of GemFire product during handshake. Expected %s but found %s",
+                        HANDSHAKE_VERSION, handshakeByte));
+              }
+              InternalDistributedMember remote = DSFIDFactory.readInternalDistributedMember(dis);
+              setRemoteAddr(remote);
+              this.sharedResource = dis.readBoolean();
+              this.preserveOrder = dis.readBoolean();
+              this.uniqueId = dis.readLong();
+              // read the product version ordinal for on-the-fly serialization
+              // transformations (for rolling upgrades)
+              this.remoteVersion = Version.readVersion(dis, true);
+              int dominoNumber = 0;
+              if (this.remoteVersion == null
+                  || (this.remoteVersion.compareTo(Version.GFE_80) >= 0)) {
+                dominoNumber = dis.readInt();
+                if (this.sharedResource) {
+                  dominoNumber = 0;
                 }
-                if (!this.sharedResource) {
-                  if (tipDomino()) {
-                    logger.info(
-                        "thread owned receiver forcing itself to send on thread owned sockets");
-                    // bug #49565 - if domino count is >= 2 use shared resources.
-                    // Also see DistributedCacheOperation#supportsDirectAck
-                  } else { // if (dominoNumber < 2) {
-                    ConnectionTable.threadWantsOwnResources();
-                    if (logger.isDebugEnabled()) {
-                      logger.debug(
-                          "thread-owned receiver with domino count of {} will prefer sending on thread-owned sockets",
-                          dominoNumber);
-                    }
-                    // } else {
-                    // ConnectionTable.threadWantsSharedResources();
+                dominoCount.set(dominoNumber);
+                // this.senderName = dis.readUTF();
+              }
+              if (!this.sharedResource) {
+                if (tipDomino()) {
+                  logger.info(
+                      "thread owned receiver forcing itself to send on thread owned sockets");
+                  // bug #49565 - if domino count is >= 2 use shared resources.
+                  // Also see DistributedCacheOperation#supportsDirectAck
+                } else { // if (dominoNumber < 2) {
+                  ConnectionTable.threadWantsOwnResources();
+                  if (logger.isDebugEnabled()) {
+                    logger.debug(
+                        "thread-owned receiver with domino count of {} will prefer sending on thread-owned sockets",
+                        dominoNumber);
                   }
-                  this.conduit.getStats().incThreadOwnedReceivers(1L, dominoNumber);
-                  // Because this thread is not shared resource, it will be used for direct
-                  // ack. Direct ack messages can be large. This call will resize the send
-                  // buffer.
-                  setSendBufferSize(this.socket);
+                  // } else {
+                  // ConnectionTable.threadWantsSharedResources();
                 }
-                // String name = owner.getDM().getConfig().getName();
-                // if (name == null) {
-                // name = "pid="+OSProcess.getId();
-                // }
-                setThreadName(dominoNumber);
-              } catch (Exception e) {
-                this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e); // bug 37101
-                logger.fatal("Error deserializing P2P handshake message", e);
-                this.readerShuttingDown = true;
-                requestClose("Error deserializing P2P handshake message");
-                return;
-              }
-              if (logger.isDebugEnabled()) {
-                logger.debug("P2P handshake remoteAddr is {}{}", this.remoteAddr,
-                    (this.remoteVersion != null ? " (" + this.remoteVersion + ')' : ""));
+                this.conduit.getStats().incThreadOwnedReceivers(1L, dominoNumber);
+                // Because this thread is not shared resource, it will be used for direct
+                // ack. Direct ack messages can be large. This call will resize the send
+                // buffer.
+                setSendBufferSize(this.socket);
               }
-              try {
-                String authInit = System.getProperty(
-                    DistributionConfigImpl.SECURITY_SYSTEM_PREFIX + SECURITY_PEER_AUTH_INIT);
-                boolean isSecure = authInit != null && authInit.length() != 0;
-
-                if (isSecure) {
-                  if (owner.getConduit().waitForMembershipCheck(this.remoteAddr)) {
-                    sendOKHandshakeReply(); // fix for bug 33224
-                    notifyHandshakeWaiter(true);
-                  } else {
-                    // ARB: check if we need notifyHandshakeWaiter() call.
-                    notifyHandshakeWaiter(false);
-                    logger.warn("{} timed out during a membership check.",
-                        p2pReaderName());
-                    return;
-                  }
-                } else {
+              // String name = owner.getDM().getConfig().getName();
+              // if (name == null) {
+              // name = "pid="+OSProcess.getId();
+              // }
+              setThreadName(dominoNumber);
+            } catch (Exception e) {
+              this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e); // bug 37101
+              logger.fatal("Error deserializing P2P handshake message", e);
+              this.readerShuttingDown = true;
+              requestClose("Error deserializing P2P handshake message");
+              return;
+            }
+            if (logger.isDebugEnabled()) {
+              logger.debug("P2P handshake remoteAddr is {}{}", this.remoteAddr,
+                  (this.remoteVersion != null ? " (" + this.remoteVersion + ')' : ""));
+            }
+            try {
+              String authInit = System.getProperty(
+                  DistributionConfigImpl.SECURITY_SYSTEM_PREFIX + SECURITY_PEER_AUTH_INIT);
+              boolean isSecure = authInit != null && authInit.length() != 0;
+
+              if (isSecure) {
+                if (owner.getConduit().waitForMembershipCheck(this.remoteAddr)) {
                   sendOKHandshakeReply(); // fix for bug 33224
-                  try {
-                    notifyHandshakeWaiter(true);
-                  } catch (Exception e) {
-                    logger.fatal("Uncaught exception from listener", e);
-                  }
+                  notifyHandshakeWaiter(true);
+                } else {
+                  // ARB: check if we need notifyHandshakeWaiter() call.
+                  notifyHandshakeWaiter(false);
+                  logger.warn("{} timed out during a membership check.",
+                      p2pReaderName());
+                  return;
                 }
-              } catch (IOException ex) {
-                final String err = "Failed sending handshake reply";
-                if (logger.isDebugEnabled()) {
-                  logger.debug(err, ex);
+              } else {
+                sendOKHandshakeReply(); // fix for bug 33224
+                try {
+                  notifyHandshakeWaiter(true);
+                } catch (Exception e) {
+                  logger.fatal("Uncaught exception from listener", e);
                 }
-                this.readerShuttingDown = true;
-                requestClose(err + ": " + ex);
-                return;
               }
+            } catch (IOException ex) {
+              final String err = "Failed sending handshake reply";
+              if (logger.isDebugEnabled()) {
+                logger.debug(err, ex);
+              }
+              this.readerShuttingDown = true;
+              requestClose(err + ": " + ex);
+              return;
             }
           }
-          if (!connected) {
-            continue;
-          }
-          accessed();
-          nioInputBuffer.limit(oldLimit);
-          nioInputBuffer.position(startPos + nioMessageLength);
-        } else {
-          done = true;
-          compactOrResizeBuffer(nioMessageLength);
         }
+        if (!connected) {
+          continue;
+        }
+        accessed();
+        peerDataBuffer.limit(oldLimit);
+        peerDataBuffer.position(startPos + messageLength);
       } else {
         done = true;
-        if (nioInputBuffer.position() != 0) {
-          nioInputBuffer.compact();
+        if (TCPConduit.useSSL) {
+          ioFilter.doneReading(peerDataBuffer);
         } else {
-          nioInputBuffer.position(nioInputBuffer.limit());
-          nioInputBuffer.limit(nioInputBuffer.capacity());
+          // if not using SSL we might want to increase our nioInputBuffer size
+          compactOrResizeBuffer(messageLength);
         }
       }
     }
@@ -3861,34 +3282,35 @@ public class Connection implements Runnable {
   }
 
   private void compactOrResizeBuffer(int messageLength) {
-    final int oldBufferSize = nioInputBuffer.capacity();
+    final int oldBufferSize = inputBuffer.capacity();
     final DMStats stats = this.owner.getConduit().getStats();
     int allocSize = messageLength + MSG_HEADER_BYTES;
     if (oldBufferSize < allocSize) {
       // need a bigger buffer
       logger.info("Allocating larger network read buffer, new size is {} old size was {}.",
-          Integer.valueOf(allocSize), Integer.valueOf(oldBufferSize));
-      ByteBuffer oldBuffer = nioInputBuffer;
-      nioInputBuffer = Buffers.acquireReceiveBuffer(allocSize, stats);
+          allocSize, oldBufferSize);
+      ByteBuffer oldBuffer = inputBuffer;
+      inputBuffer = Buffers.acquireReceiveBuffer(allocSize, stats);
 
       if (oldBuffer != null) {
         int oldByteCount = oldBuffer.remaining();
-        nioInputBuffer.put(oldBuffer);
-        nioInputBuffer.position(oldByteCount);
+        inputBuffer.put(oldBuffer);
+        inputBuffer.position(oldByteCount);
         Buffers.releaseReceiveBuffer(oldBuffer, stats);
       }
     } else {
-      if (nioInputBuffer.position() != 0) {
-        nioInputBuffer.compact();
+      if (inputBuffer.position() != 0) {
+        inputBuffer.compact();
       } else {
-        nioInputBuffer.position(nioInputBuffer.limit());
-        nioInputBuffer.limit(nioInputBuffer.capacity());
+        inputBuffer.position(inputBuffer.limit());
+        inputBuffer.limit(inputBuffer.capacity());
       }
     }
   }
 
   private boolean dispatchMessage(DistributionMessage msg, int bytesRead, boolean directAck) {
     try {
+      logger.info("BRUCE: dispatchMessage invoked for " + msg);
       msg.setDoDecMessagesBeingReceived(true);
       if (directAck) {
         Assert.assertTrue(!isSharedResource(),
@@ -3918,11 +3340,11 @@ public class Connection implements Runnable {
     return result;
   }
 
-  public boolean isSocketClosed() {
+  boolean isSocketClosed() {
     return this.socket.isClosed() || !this.socket.isConnected();
   }
 
-  public boolean isReceiverStopped() {
+  boolean isReceiverStopped() {
     return this.stopped;
   }
 
@@ -3945,7 +3367,7 @@ public class Connection implements Runnable {
   /**
    * Return the version of the member on the other side of this connection.
    */
-  public Version getRemoteVersion() {
+  Version getRemoteVersion() {
     return this.remoteVersion;
   }
 
@@ -3966,14 +3388,14 @@ public class Connection implements Runnable {
    * @return true if the connection was initiated here
    * @since GemFire 5.1
    */
-  protected boolean getOriginatedHere() {
+  boolean getOriginatedHere() {
     return !this.isReceiver;
   }
 
   /**
    * answers whether this connection is used for ordered message delivery
    */
-  protected boolean getPreserveOrder() {
+  boolean getPreserveOrder() {
     return preserveOrder;
   }
 
@@ -3987,14 +3409,14 @@ public class Connection implements Runnable {
   /**
    * answers the number of messages received by this connection
    */
-  protected long getMessagesReceived() {
+  long getMessagesReceived() {
     return messagesReceived;
   }
 
   /**
    * answers the number of messages sent on this connection
    */
-  protected long getMessagesSent() {
+  long getMessagesSent() {
     return messagesSent;
   }
 
@@ -4047,30 +3469,4 @@ public class Connection implements Runnable {
     releaseSendPermission();
   }
 
-  boolean nioChecked;
-  boolean useNIO;
-
-  private boolean useNIO() {
-    if (TCPConduit.useSSL) {
-      return false;
-    }
-    if (this.nioChecked) {
-      return this.useNIO;
-    }
-    this.nioChecked = true;
-    this.useNIO = this.owner.getConduit().useNIO();
-    if (!this.useNIO) {
-      return false;
-    }
-    // JDK bug 6230761 - NIO can't be used with IPv6 on Windows
-    if (this.socket != null && (this.socket.getInetAddress() instanceof Inet6Address)) {
-      String os = System.getProperty("os.name");
-      if (os != null) {
-        if (os.contains("Windows")) {
-          this.useNIO = false;
-        }
-      }
-    }
-    return this.useNIO;
-  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index 8bf3b3a..e99762c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -47,6 +47,7 @@ import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.alerting.AlertingAction;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.LoggingExecutors;
+import org.apache.geode.internal.net.NioFilter;
 import org.apache.geode.internal.net.SocketCloser;
 
 /**
@@ -207,14 +208,15 @@ public class ConnectionTable {
   }
 
   /** conduit calls acceptConnection after an accept */
-  protected void acceptConnection(Socket sock, PeerConnectionFactory peerConnectionFactory)
+  protected void acceptConnection(Socket sock, PeerConnectionFactory peerConnectionFactory,
+      NioFilter nioFilter)
       throws IOException, ConnectionException, InterruptedException {
     InetAddress connAddress = sock.getInetAddress(); // for bug 44736
     boolean finishedConnecting = false;
     Connection connection = null;
     // boolean exceptionLogged = false;
     try {
-      connection = peerConnectionFactory.createReceiver(this, sock);
+      connection = peerConnectionFactory.createReceiver(this, sock, nioFilter);
 
       // check for shutdown (so it doesn't get missed in the finally block)
       this.owner.getCancelCriterion().checkCancelInProgress(null);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java
index 600d967..f54860b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.ObjToByteArraySerializer;
+import org.apache.geode.internal.net.Buffers;
 
 /**
  * MsgOutputStream should no longer be used except in Connection to do the handshake. Otherwise
@@ -37,7 +38,7 @@ public class MsgOutputStream extends OutputStream implements ObjToByteArraySeria
    * The caller of this constructor is responsible for managing the allocated instance.
    */
   public MsgOutputStream(int allocSize) {
-    if (TCPConduit.useDirectBuffers) {
+    if (Buffers.useDirectBuffers) {
       this.buffer = ByteBuffer.allocateDirect(allocSize);
     } else {
       this.buffer = ByteBuffer.allocate(allocSize);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
index a09d2f2..2fb5a34 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
@@ -38,6 +38,7 @@ import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.ObjToByteArraySerializer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.net.Buffers;
 
 /**
  * <p>
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java
index a4e35a4..afa9886 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 
 import org.apache.geode.internal.Version;
+import org.apache.geode.internal.net.Buffers;
 
 /**
  * A message reader which reads from the socket using (blocking) nio.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/PeerConnectionFactory.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/PeerConnectionFactory.java
index 148c27a..27dab2c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/PeerConnectionFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/PeerConnectionFactory.java
@@ -18,14 +18,21 @@ package org.apache.geode.internal.tcp;
 import java.io.IOException;
 import java.net.Socket;
 
+import org.apache.geode.internal.net.NioEngine;
+import org.apache.geode.internal.net.NioFilter;
+
 public class PeerConnectionFactory {
   /**
    * creates a connection that we accepted (it was initiated by an explicit connect being done on
    * the other side). We will only receive data on this socket; never send.
    */
-  public Connection createReceiver(ConnectionTable table, Socket socket)
+  public Connection createReceiver(ConnectionTable table, Socket socket,
+      NioFilter nioFilter)
       throws IOException, ConnectionException {
-    Connection connection = new Connection(table, socket);
+    if (nioFilter == null) {
+      nioFilter = new NioEngine();
+    }
+    Connection connection = new Connection(table, socket, nioFilter);
     connection.initReceiver();
     return connection;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
index 0057847..44a096d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
@@ -15,7 +15,6 @@
 package org.apache.geode.internal.tcp;
 
 import java.io.IOException;
-import java.net.Inet6Address;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
@@ -31,8 +30,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadPoolExecutor;
 
-import javax.net.ssl.SSLException;
-
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelCriterion;
@@ -53,6 +50,9 @@ import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.LoggingExecutors;
 import org.apache.geode.internal.logging.LoggingThread;
 import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.internal.net.Buffers;
+import org.apache.geode.internal.net.NioEngine;
+import org.apache.geode.internal.net.NioFilter;
 import org.apache.geode.internal.net.SocketCreator;
 import org.apache.geode.internal.net.SocketCreatorFactory;
 import org.apache.geode.internal.security.SecurableCommunicationChannel;
@@ -110,18 +110,6 @@ public class TCPConduit implements Runnable {
   static boolean useSSL;
 
   /**
-   * Force use of Sockets rather than SocketChannels (NIO). Note from Bruce: due to a bug in the
-   * java VM, NIO cannot be used with IPv6 addresses on Windows. When that condition holds, the
-   * useNIO flag must be disregarded.
-   */
-  private static boolean USE_NIO;
-
-  /**
-   * use direct ByteBuffers instead of heap ByteBuffers for NIO operations
-   */
-  static boolean useDirectBuffers;
-
-  /**
    * The socket producer used by the cluster
    */
   private final SocketCreator socketCreator;
@@ -129,11 +117,6 @@ public class TCPConduit implements Runnable {
 
   private MembershipManager membershipManager;
 
-  /**
-   * true if NIO can be used for the server socket
-   */
-  private boolean useNIO;
-
   static {
     init();
   }
@@ -148,13 +131,13 @@ public class TCPConduit implements Runnable {
 
   public static void init() {
     useSSL = Boolean.getBoolean("p2p.useSSL");
-    // only use nio if not SSL
-    USE_NIO = !useSSL && !Boolean.getBoolean("p2p.oldIO");
     // only use direct buffers if we are using nio
-    useDirectBuffers = USE_NIO && !Boolean.getBoolean("p2p.nodirectBuffers");
-    LISTENER_CLOSE_TIMEOUT = Integer.getInteger("p2p.listenerCloseTimeout", 60000).intValue();
+    LISTENER_CLOSE_TIMEOUT = Integer.getInteger("p2p.listenerCloseTimeout", 60000);
     // note: bug 37730 concerned this defaulting to 50
-    BACKLOG = Integer.getInteger("p2p.backlog", 1280).intValue();
+    BACKLOG = Integer.getInteger("p2p.backlog", 1280);
+    if (Boolean.getBoolean("p2p.oldIO")) {
+      logger.warn("detected use of p2p.oldIO setting - this is no longer supported");
+    }
   }
 
   ///////////////// permanent conduit state
@@ -162,8 +145,8 @@ public class TCPConduit implements Runnable {
   /**
    * the size of OS TCP/IP buffers, not set by default
    */
-  public int tcpBufferSize = DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
-  public int idleConnectionTimeout = DistributionConfig.DEFAULT_SOCKET_LEASE_TIME;
+  int tcpBufferSize = DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
+  int idleConnectionTimeout = DistributionConfig.DEFAULT_SOCKET_LEASE_TIME;
 
   /**
    * port is the tcp/ip port that this conduit binds to. If it is zero, a port from
@@ -278,24 +261,12 @@ public class TCPConduit implements Runnable {
     this.socketCreator =
         SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER);
 
-    this.useNIO = USE_NIO;
-    if (this.useNIO) {
-      InetAddress addr = address;
-      if (addr == null) {
-        try {
-          addr = SocketCreator.getLocalHost();
-        } catch (java.net.UnknownHostException e) {
-          throw new ConnectionException("Unable to resolve localHost address", e);
-        }
-      }
-      // JDK bug 6230761 - NIO can't be used with IPv6 on Windows
-      if (addr instanceof Inet6Address) {
-        String os = System.getProperty("os.name");
-        if (os != null) {
-          if (os.indexOf("Windows") != -1) {
-            this.useNIO = false;
-          }
-        }
+    InetAddress addr = address;
+    if (addr == null) {
+      try {
+        addr = SocketCreator.getLocalHost();
+      } catch (java.net.UnknownHostException e) {
+        throw new ConnectionException("Unable to resolve localHost address", e);
       }
     }
 
@@ -353,9 +324,9 @@ public class TCPConduit implements Runnable {
   private volatile Exception shutdownCause;
 
   private static final int HANDSHAKE_POOL_SIZE =
-      Integer.getInteger("p2p.HANDSHAKE_POOL_SIZE", 10).intValue();
+      Integer.getInteger("p2p.HANDSHAKE_POOL_SIZE", 10);
   private static final long HANDSHAKE_POOL_KEEP_ALIVE_TIME =
-      Long.getLong("p2p.HANDSHAKE_POOL_KEEP_ALIVE_TIME", 60).longValue();
+      Long.getLong("p2p.HANDSHAKE_POOL_KEEP_ALIVE_TIME", 60);
 
   /**
    * added to fix bug 40436
@@ -378,7 +349,7 @@ public class TCPConduit implements Runnable {
     InetAddress ba = this.address;
 
     {
-      ExecutorService tmp_hsPool = null;
+      ExecutorService tmp_hsPool;
       String threadName = "P2P-Handshaker " + ba + ":" + p + " Thread ";
       try {
         tmp_hsPool =
@@ -428,61 +399,35 @@ public class TCPConduit implements Runnable {
     InetAddress bindAddress = this.address;
 
     try {
-      if (this.useNIO) {
-        if (serverPort <= 0) {
+      if (serverPort <= 0) {
 
-          socket = socketCreator.createServerSocketUsingPortRange(bindAddress,
-              connectionRequestBacklog, isBindAddress,
-              this.useNIO, 0, tcpPortRange);
-        } else {
-          ServerSocketChannel channel = ServerSocketChannel.open();
-          socket = channel.socket();
-
-          InetSocketAddress inetSocketAddress =
-              new InetSocketAddress(isBindAddress ? bindAddress : null, serverPort);
-          socket.bind(inetSocketAddress, connectionRequestBacklog);
-        }
-
-        if (useNIO) {
-          try {
-            // set these buffers early so that large buffers will be allocated
-            // on accepted sockets (see java.net.ServerSocket.setReceiverBufferSize javadocs)
-            socket.setReceiveBufferSize(tcpBufferSize);
-            int newSize = socket.getReceiveBufferSize();
-            if (newSize != tcpBufferSize) {
-              logger.info("{} is {} instead of the requested {}",
-                  "Listener receiverBufferSize", Integer.valueOf(newSize),
-                  Integer.valueOf(tcpBufferSize));
-            }
-          } catch (SocketException ex) {
-            logger.warn("Failed to set listener receiverBufferSize to {}",
-                tcpBufferSize);
-          }
-        }
-        channel = socket.getChannel();
+        socket = socketCreator.createServerSocketUsingPortRange(bindAddress,
+            connectionRequestBacklog, isBindAddress,
+            true, 0, tcpPortRange);
       } else {
-        try {
-          if (serverPort <= 0) {
-            socket = socketCreator.createServerSocketUsingPortRange(bindAddress,
-                connectionRequestBacklog, isBindAddress,
-                this.useNIO, this.tcpBufferSize, tcpPortRange);
-          } else {
-            socket = socketCreator.createServerSocket(serverPort, connectionRequestBacklog,
-                isBindAddress ? bindAddress : null,
-                this.tcpBufferSize);
-          }
-          int newSize = socket.getReceiveBufferSize();
-          if (newSize != this.tcpBufferSize) {
-            logger.info("Listener receiverBufferSize is {} instead of the requested {}",
-                Integer.valueOf(newSize),
-                Integer.valueOf(this.tcpBufferSize));
-          }
-        } catch (SocketException ex) {
-          logger.warn("Failed to set listener receiverBufferSize to {}",
-              this.tcpBufferSize);
+        ServerSocketChannel channel = ServerSocketChannel.open();
+        socket = channel.socket();
 
+        InetSocketAddress inetSocketAddress =
+            new InetSocketAddress(isBindAddress ? bindAddress : null, serverPort);
+        socket.bind(inetSocketAddress, connectionRequestBacklog);
+      }
+
+      try {
+        // set these buffers early so that large buffers will be allocated
+        // on accepted sockets (see java.net.ServerSocket.setReceiverBufferSize javadocs)
+        socket.setReceiveBufferSize(tcpBufferSize);
+        int newSize = socket.getReceiveBufferSize();
+        if (newSize != tcpBufferSize) {
+          logger.info("{} is {} instead of the requested {}",
+              "Listener receiverBufferSize", Integer.valueOf(newSize),
+              Integer.valueOf(tcpBufferSize));
         }
+      } catch (SocketException ex) {
+        logger.warn("Failed to set listener receiverBufferSize to {}",
+            tcpBufferSize);
       }
+      channel = socket.getChannel();
       port = socket.getLocalPort();
     } catch (IOException io) {
       throw new ConnectionException(
@@ -550,7 +495,7 @@ public class TCPConduit implements Runnable {
         // set timeout endpoint here since interrupt() has been known
         // to hang
         long timeout = System.currentTimeMillis() + LISTENER_CLOSE_TIMEOUT;
-        Thread t = this.thread;;
+        Thread t = this.thread;
         if (channel != null) {
           channel.close();
           // NOTE: do not try to interrupt the listener thread at this point.
@@ -576,7 +521,7 @@ public class TCPConduit implements Runnable {
         if (t != null && t.isAlive()) {
           logger.warn(
               "Unable to shut down listener within {}ms.  Unable to interrupt socket.accept() due to JDK bug. Giving up.",
-              Integer.valueOf(LISTENER_CLOSE_TIMEOUT));
+              LISTENER_CLOSE_TIMEOUT);
         }
       } catch (IOException | InterruptedException e) {
         // we're already trying to shutdown, ignore
@@ -652,21 +597,8 @@ public class TCPConduit implements Runnable {
 
       Socket othersock = null;
       try {
-        if (this.useNIO) {
-          SocketChannel otherChannel = channel.accept();
-          othersock = otherChannel.socket();
-        } else {
-          try {
-            othersock = socket.accept();
-          } catch (SSLException ex) {
-            // SW: This is the case when there is a problem in P2P
-            // SSL configuration, so need to exit otherwise goes into an
-            // infinite loop just filling the logs
-            logger.warn("Stopping P2P listener due to SSL configuration problem.",
-                ex);
-            break;
-          }
-        }
+        SocketChannel otherChannel = channel.accept();
+        othersock = otherChannel.socket();
         if (stopped) {
           try {
             if (othersock != null) {
@@ -769,17 +701,19 @@ public class TCPConduit implements Runnable {
     return result;
   }
 
-  protected void basicAcceptConnection(Socket othersock) {
+  private void basicAcceptConnection(Socket othersock) {
     try {
       othersock.setSoTimeout(0);
-      socketCreator.handshakeIfSocketIsSSL(othersock, idleConnectionTimeout);
-      getConTable().acceptConnection(othersock, new PeerConnectionFactory());
-    } catch (IOException io) {
-      // exception is logged by the Connection
-      if (!stopped) {
-        this.getStats().incFailedAccept();
+      SocketChannel channel = othersock.getChannel();
+      NioFilter nioFilter = null;
+      if (useSSL && channel != null) {
+        nioFilter = socketCreator.handshakeSSLSocketChannel(channel, idleConnectionTimeout, false,
+            Buffers.useDirectBuffers);
+      } else {
+        nioFilter = new NioEngine();
       }
-    } catch (ConnectionException ex) {
+      getConTable().acceptConnection(othersock, new PeerConnectionFactory(), nioFilter);
+    } catch (IOException | ConnectionException io) {
       // exception is logged by the Connection
       if (!stopped) {
         this.getStats().incFailedAccept();
@@ -795,13 +729,6 @@ public class TCPConduit implements Runnable {
   }
 
   /**
-   * return true if "new IO" classes are being used for the server socket
-   */
-  protected boolean useNIO() {
-    return this.useNIO;
-  }
-
-  /**
    * records the current outgoing message count on all thread-owned ordered connections
    *
    * @since GemFire 5.1
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java
index 075b252..7c87da1 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java
@@ -20,9 +20,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.io.InputStream;
 import java.net.InetSocketAddress;
-import java.net.Socket;
+import java.nio.channels.SocketChannel;
 
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -66,7 +65,7 @@ public class ConnectionJUnitTest {
 
     // NIO can't be mocked because SocketChannel has a final method that
     // is used by Connection - configureBlocking
-    when(conduit.useNIO()).thenReturn(false);
+    // when(conduit.useNIO()).thenReturn(false);
 
     // mock the distribution manager and membership manager
     when(distMgr.getMembershipManager()).thenReturn(membership);
@@ -75,12 +74,9 @@ public class ConnectionJUnitTest {
     SocketCloser closer = mock(SocketCloser.class);
     when(table.getSocketCloser()).thenReturn(closer);
 
-    InputStream instream = mock(InputStream.class);
-    when(instream.read()).thenReturn(-1);
-    Socket socket = mock(Socket.class);
-    when(socket.getInputStream()).thenReturn(instream);
+    SocketChannel channel = SocketChannel.open();
 
-    Connection conn = new Connection(table, socket);
+    Connection conn = new Connection(table, channel.socket(), null);
     conn.setSharedUnorderedForTest();
     conn.run();
     verify(membership).suspectMember(isNull(InternalDistributedMember.class), any(String.class));
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java
index ff46493..553b7ce 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java
@@ -65,7 +65,7 @@ public class ConnectionTableTest {
     connectionTable = ConnectionTable.create(tcpConduit);
 
     factory = mock(PeerConnectionFactory.class);
-    when(factory.createReceiver(connectionTable, socket)).thenReturn(connection);
+    when(factory.createReceiver(connectionTable, socket, null)).thenReturn(connection);
   }
 
   @Test
@@ -74,7 +74,7 @@ public class ConnectionTableTest {
     when(connection.isSocketClosed()).thenReturn(true); // Pretend this closed as soon at it was
                                                         // created
 
-    connectionTable.acceptConnection(socket, factory);
+    connectionTable.acceptConnection(socket, factory, null);
     assertEquals(0, connectionTable.getNumberOfReceivers());
   }
 
@@ -84,7 +84,7 @@ public class ConnectionTableTest {
 
     when(connection.isReceiverStopped()).thenReturn(true);// but receiver is stopped
 
-    connectionTable.acceptConnection(socket, factory);
+    connectionTable.acceptConnection(socket, factory, null);
     assertEquals(0, connectionTable.getNumberOfReceivers());
   }
 
@@ -92,7 +92,7 @@ public class ConnectionTableTest {
   public void testSocketNotClosedAddedAsReceivers() throws Exception {
     when(connection.isSocketClosed()).thenReturn(false);// connection is not closed
 
-    connectionTable.acceptConnection(socket, factory);
+    connectionTable.acceptConnection(socket, factory, null);
     assertEquals(1, connectionTable.getNumberOfReceivers());
   }
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
index e7928b9..77160c8 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
@@ -38,9 +38,9 @@ public class ConnectionTest {
     boolean forceAsync = true;
     DistributionMessage mockDistributionMessage = mock(DistributionMessage.class);
 
-    mockConnection.nioWriteFully(channel, buffer, forceAsync, mockDistributionMessage);
+    mockConnection.writeFully(channel, buffer, forceAsync, mockDistributionMessage);
 
-    verify(mockConnection, times(1)).nioWriteFully(channel, buffer, forceAsync,
+    verify(mockConnection, times(1)).writeFully(channel, buffer, forceAsync,
         mockDistributionMessage);
   }
 }
diff --git a/geode-core/src/test/resources/org/apache/geode/internal/util/PluckStacksJstackGeneratedDump.txt b/geode-core/src/test/resources/org/apache/geode/internal/util/PluckStacksJstackGeneratedDump.txt
index d1d7ff8..7776fcc 100644
--- a/geode-core/src/test/resources/org/apache/geode/internal/util/PluckStacksJstackGeneratedDump.txt
+++ b/geode-core/src/test/resources/org/apache/geode/internal/util/PluckStacksJstackGeneratedDump.txt
@@ -16,7 +16,7 @@ Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.144-b01 mixed mode):
 	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
 	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
 	- locked <0x0000000749c85bd8> (a java.lang.Object)
-	at org.apache.geode.internal.tcp.Connection.runNioReader(Connection.java:1810)
+	at readMessages(Connection.java:1810)
 	at org.apache.geode.internal.tcp.Connection.run(Connection.java:1690)
 	at java.lang.Thread.run(Thread.java:748)
 
@@ -31,7 +31,7 @@ Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.144-b01 mixed mode):
 	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
 	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
 	- locked <0x0000000749c859a0> (a java.lang.Object)
-	at org.apache.geode.internal.tcp.Connection.runNioReader(Connection.java:1810)
+	at readMessages(Connection.java:1810)
 	at org.apache.geode.internal.tcp.Connection.run(Connection.java:1690)
 	at java.lang.Thread.run(Thread.java:748)
 
@@ -61,7 +61,7 @@ Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.144-b01 mixed mode):
 	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
 	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
 	- locked <0x0000000749c85768> (a java.lang.Object)
-	at org.apache.geode.internal.tcp.Connection.runNioReader(Connection.java:1810)
+	at readMessages(Connection.java:1810)
 	at org.apache.geode.internal.tcp.Connection.run(Connection.java:1690)
 	at java.lang.Thread.run(Thread.java:748)
 
@@ -176,7 +176,7 @@ Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.144-b01 mixed mode):
 	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
 	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
 	- locked <0x0000000749c84fd0> (a java.lang.Object)
-	at org.apache.geode.internal.tcp.Connection.runNioReader(Connection.java:1810)
+	at readMessages(Connection.java:1810)
 	at org.apache.geode.internal.tcp.Connection.run(Connection.java:1690)
 	at java.lang.Thread.run(Thread.java:748)
 
@@ -1080,7 +1080,7 @@ Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.144-b01 mixed mode):
 	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
 	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
 	- locked <0x00000007435986d8> (a java.lang.Object)
-	at org.apache.geode.internal.tcp.Connection.runNioReader(Connection.java:1810)
+	at readMessages(Connection.java:1810)
 	at org.apache.geode.internal.tcp.Connection.run(Connection.java:1690)
 	at java.lang.Thread.run(Thread.java:748)
 
@@ -1111,7 +1111,7 @@ Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.144-b01 mixed mode):
 	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
 	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
 	- locked <0x00000007435984a0> (a java.lang.Object)
-	at org.apache.geode.internal.tcp.Connection.runNioReader(Connection.java:1810)
+	at readMessages(Connection.java:1810)
 	at org.apache.geode.internal.tcp.Connection.run(Connection.java:1690)
 	at java.lang.Thread.run(Thread.java:748)
 
@@ -1389,7 +1389,7 @@ Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.144-b01 mixed mode):
 	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
 	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
 	- locked <0x00000006479b3c70> (a java.lang.Object)
-	at org.apache.geode.internal.tcp.Connection.runNioReader(Connection.java:1810)
+	at readMessages(Connection.java:1810)
 	at org.apache.geode.internal.tcp.Connection.run(Connection.java:1690)
 	at java.lang.Thread.run(Thread.java:748)
 
@@ -2074,7 +2074,7 @@ Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.144-b01 mixed mode):
 	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
 	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
 	- locked <0x0000000647a66b00> (a java.lang.Object)
-	at org.apache.geode.internal.tcp.Connection.runNioReader(Connection.java:1810)
+	at readMessages(Connection.java:1810)
 	at org.apache.geode.internal.tcp.Connection.run(Connection.java:1690)
 	at java.lang.Thread.run(Thread.java:748)
 
@@ -2108,7 +2108,7 @@ Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.144-b01 mixed mode):
 	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
 	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
 	- locked <0x0000000647a92678> (a java.lang.Object)
-	at org.apache.geode.internal.tcp.Connection.runNioReader(Connection.java:1810)
+	at readMessages(Connection.java:1810)
 	at org.apache.geode.internal.tcp.Connection.run(Connection.java:1690)
 	at java.lang.Thread.run(Thread.java:748)
 


Mime
View raw message