geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bschucha...@apache.org
Subject [geode] branch develop updated: GEODE-4059: Changing protobuf handshake to not need communication mode bytes
Date Fri, 08 Dec 2017 17:52:30 GMT
This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 15bb387  GEODE-4059: Changing protobuf handshake to not need communication mode bytes
15bb387 is described below

commit 15bb387bd086447ec55b264966819e70d40f608d
Author: Bruce Schuchardt <bschuchardt@pivotal.io>
AuthorDate: Fri Dec 8 09:51:07 2017 -0800

    GEODE-4059: Changing protobuf handshake to not need communication mode bytes
    
    This closes pull request #1137
    
    Squashed commit of the following:
    
    commit d9f5144cd7f7ddcf42eb7eb5febed531cbf67362
    Merge: 6fa9bf6c8 8fd707201
    Author: Bruce Schuchardt <bschuchardt@pivotal.io>
    Date:   Fri Dec 8 09:34:55 2017 -0800
    
        Merge branch 'feature/GEODE-4059' of https://github.com/WireBaron/geode into WireBaron-feature/GEODE-4059
    
    commit 8fd707201f0e02b1fdf81c1596f93d3fd9ccb0cf
    Author: Brian Rowe <browe@pivotal.io>
    Date:   Thu Dec 7 14:53:28 2017 -0800
    
        Removing extra handshake from a unit test.
    
        Signed-off-by: Galen O'Sullivan <gosullivan@pivotal.io>
    
    commit 7d07c6f5163d99b92a12ca856a1b397fda6e0eb4
    Author: Galen O'Sullivan <gosullivan@pivotal.io>
    Date:   Wed Dec 6 16:20:25 2017 -0800
    
        GEODE-4059: Changing protobuf handshake to not need communication mode bytes
    
        - Moved handshake proto messages into top level handshake.proto
        - Changed protobuf communication mode to match leading byte of handshake
        - Updated handshake handler and tests to handle changes
        - Modified locator to accept protobuf communication mode in lieu of gossip
    
        Signed-off-by: Brian Rowe <browe@pivotal.io>
---
 .../protocol/state/ConnectionStateProcessor.java   |  17 +-
 .../distributed/internal/tcpserver/TcpServer.java  | 202 +++++++++++----------
 .../protocol/ClientProtocolServiceLoader.java      |   6 +-
 .../internal/cache/tier/CommunicationMode.java     |  25 ++-
 .../tier/sockets/ServerConnectionFactory.java      |  18 +-
 .../src/main/proto/handshake.proto                 |  31 ++--
 .../src/main/proto/v1/basicTypes.proto             |   3 -
 .../src/main/proto/v1/clientProtocol.proto         |   3 -
 .../src/main/proto/v1/connection_API.proto         |  20 --
 .../protobuf/v1/ProtobufLocatorPipeline.java       |  23 +++
 .../protobuf/v1/ProtobufProtocolService.java       |   3 +-
 .../protobuf/v1/ProtobufStreamProcessor.java       |   5 +
 .../protobuf/v1/operations/HandshakeHandler.java   |  54 ++++++
 .../HandshakeRequestOperationHandler.java          |  65 -------
 .../protobuf/v1/operations/VersionValidator.java   |  10 +-
 .../registry/ProtobufOperationContextRegistry.java |   8 -
 .../ProtobufConnectionHandshakeStateProcessor.java |  36 +++-
 .../protobuf/v1/AuthenticationIntegrationTest.java |  11 +-
 .../protobuf/v1/AuthorizationIntegrationTest.java  |  12 +-
 .../protobuf/v1/HandshakeIntegrationTest.java      |  82 +++------
 .../internal/protocol/protobuf/v1/MessageUtil.java |  38 ++++
 .../v1/acceptance/CacheConnectionJUnitTest.java    |  18 +-
 .../CacheConnectionTimeoutJUnitTest.java           |  11 +-
 .../v1/acceptance/CacheMaxConnectionJUnitTest.java |  19 +-
 .../v1/acceptance/CacheOperationsJUnitTest.java    |  14 +-
 .../v1/acceptance/LocatorConnectionDUnitTest.java  |   7 +-
 .../v1/operations/HandshakeHandlerJUnitTest.java   | 140 ++++++++++++++
 .../HandshakeRequestOperationHandlerJUnitTest.java | 191 -------------------
 .../v1/operations/VersionValidatorJUnitTest.java   |  10 +-
 29 files changed, 492 insertions(+), 590 deletions(-)

diff --git a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionStateProcessor.java b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionStateProcessor.java
index e0d18b3..b55524f 100644
--- a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionStateProcessor.java
+++ b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionStateProcessor.java
@@ -14,6 +14,10 @@
  */
 package org.apache.geode.internal.protocol.state;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import org.apache.geode.internal.protocol.MessageExecutionContext;
 import org.apache.geode.internal.protocol.OperationContext;
 import org.apache.geode.internal.protocol.ProtocolErrorCode;
@@ -48,15 +52,14 @@ public interface ConnectionStateProcessor {
   }
 
   /**
-   * This indicates whether this specific state processor is able to handle handshake requests.
+   * Allow the state processor to take over the entire processing of a given message.
    *
-   * @return specialized ConnectionHandshakingStateProcessor interface implementation which can move
-   *         to a new state
-   * @throws ConnectionStateException if unable to handle handshakes in this state.
+   * @return - True if the message has been handled by the state processor, false to continue normal
+   *         processing.
    */
-  default ConnectionHandshakingStateProcessor allowHandshake() throws ConnectionStateException {
-    throw new ConnectionStateException(ProtocolErrorCode.UNSUPPORTED_OPERATION,
-        "Requested operation not allowed at this time");
+  default boolean handleMessageIndependently(InputStream inputStream, OutputStream outputStream,
+      MessageExecutionContext executionContext) throws IOException {
+    return false;
   }
 
   /**
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
index aadbfb4..4e002d8 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
@@ -44,6 +44,7 @@ import org.apache.geode.CancelException;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.IncompatibleVersionException;
+import org.apache.geode.cache.UnsupportedVersionException;
 import org.apache.geode.distributed.internal.ClusterConfigurationService;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.DistributionConfigImpl;
@@ -364,7 +365,6 @@ public class TcpServer {
     executor.execute(() -> {
       long startTime = DistributionStats.getStatTime();
       DataInputStream input = null;
-      Object request, response;
       try {
 
         socket.setSoTimeout(READ_TIMEOUT);
@@ -379,89 +379,20 @@ public class TcpServer {
               + (socket.getInetAddress().getHostAddress() + ":" + socket.getPort()), e);
           return;
         }
-        int gossipVersion = readGossipVersion(socket, input);
-
-        short versionOrdinal;
-        if (gossipVersion == NON_GOSSIP_REQUEST_VERSION) {
-          if (input.readUnsignedByte() == PROTOBUF_CLIENT_SERVER_PROTOCOL
-              && Boolean.getBoolean("geode.feature-protobuf-protocol")) {
-            try {
-              int protocolVersion = input.readUnsignedByte();
-              ClientProtocolService clientProtocolService =
-                  clientProtocolServiceLoader.lookupService(protocolVersion);
-              clientProtocolService.initializeStatistics("LocatorStats",
-                  internalLocator.getDistributedSystem());
-              try (ClientProtocolProcessor pipeline =
-                  clientProtocolService.createProcessorForLocator(internalLocator)) {
-                pipeline.processMessage(input, socket.getOutputStream());
-              } catch (IncompatibleVersionException e) {
-                // should not happen on the locator as there is no handshake.
-                log.error("Unexpected exception in client message processing", e);
-              }
-            } catch (ServiceLoadingFailureException e) {
-              log.error("There was an error looking up the client protocol service", e);
-              socket.close();
-              throw new IOException("There was an error looking up the client protocol service", e);
-            } catch (ServiceVersionNotFoundException e) {
-              log.error("Unable to find service matching the client protocol version byte", e);
-              socket.close();
-              throw new IOException(
-                  "Unable to find service matching the client protocol version byte", e);
-            }
-          } else {
-            rejectUnknownProtocolConnection(socket, gossipVersion);
-          }
-        } else if (gossipVersion <= getCurrentGossipVersion()
-            && GOSSIP_TO_GEMFIRE_VERSION_MAP.containsKey(gossipVersion)) {
-          // Create a versioned stream to remember sender's GemFire version
-          versionOrdinal = (short) GOSSIP_TO_GEMFIRE_VERSION_MAP.get(gossipVersion);
-
-          if (Version.GFE_71.compareTo(versionOrdinal) <= 0) {
-            // Recent versions of TcpClient will send the version ordinal
-            versionOrdinal = input.readShort();
-          }
-
-          if (log.isDebugEnabled() && versionOrdinal != Version.CURRENT_ORDINAL) {
-            log.debug("Locator reading request from " + socket.getInetAddress() + " with version "
-                + Version.fromOrdinal(versionOrdinal, false));
-          }
-          input = new VersionedDataInputStream(input, Version.fromOrdinal(versionOrdinal, false));
-          request = DataSerializer.readObject(input);
-          if (log.isDebugEnabled()) {
-            log.debug("Locator received request " + request + " from " + socket.getInetAddress());
-          }
-          if (request instanceof ShutdownRequest) {
-            shuttingDown = true;
-            // Don't call shutdown from within the worker thread, see java bug #6576792.
-            // Closing the socket will cause our acceptor thread to shutdown the executor
-            this.serverSocketPortAtClose = srv_sock.getLocalPort();
-            srv_sock.close();
-            response = new ShutdownResponse();
-          } else if (request instanceof InfoRequest) {
-            response = handleInfoRequest(request);
-          } else if (request instanceof VersionRequest) {
-            response = handleVersionRequest(request);
-          } else {
-            response = handler.processRequest(request);
-          }
-
-          handler.endRequest(request, startTime);
-
-          startTime = DistributionStats.getStatTime();
-          if (response != null) {
-            DataOutputStream output = new DataOutputStream(socket.getOutputStream());
-            if (versionOrdinal != Version.CURRENT_ORDINAL) {
-              output =
-                  new VersionedDataOutputStream(output, Version.fromOrdinal(versionOrdinal, false));
-            }
-            DataSerializer.writeObject(response, output);
-            output.flush();
-          }
+        // read the first byte & check for an improperly configured client pool trying
+        // to contact a cache server
+        int firstByte = input.readUnsignedByte();
+        if (firstByte == CommunicationMode.ReservedForGossip.getModeNumber()) {
+          processOneConnection(socket, startTime, input);
+        } else if (firstByte == CommunicationMode.ProtobufClientServerProtocol.getModeNumber()) {
+          handleProtobufConnection(socket, input);
+        } else if (CommunicationMode.isValidMode(firstByte)) {
+          socket.getOutputStream().write(HandShake.REPLY_SERVER_IS_LOCATOR);
+          throw new Exception("Improperly configured client detected - use addPoolLocator to "
+              + "configure its locators instead of addPoolServer.");
 
-          handler.endResponse(request, startTime);
         } else {
-          // Close the socket. We can not accept requests from a newer version
-          rejectUnknownProtocolConnection(socket, gossipVersion);
+          rejectUnknownProtocolConnection(socket, firstByte);
         }
       } catch (EOFException ignore) {
         // client went away - ignore
@@ -516,32 +447,109 @@ public class TcpServer {
     });
   }
 
-  private void rejectUnknownProtocolConnection(Socket socket, int gossipVersion)
-      throws IOException {
+  private void processOneConnection(Socket socket, long startTime, DataInputStream input)
+      throws IOException, UnsupportedVersionException, ClassNotFoundException {
+    // At this point we've read the leading byte of the gossip version and found it to be 0,
+    // continue reading the next three bytes
+    int gossipVersion = 0;
+    for (int i = 0; i < 3; i++) {
+      gossipVersion = (gossipVersion << 8) + (0xff & input.readUnsignedByte());
+    }
+
+    Object request;
+    Object response;
+    short versionOrdinal;
+    if (gossipVersion <= getCurrentGossipVersion()
+        && GOSSIP_TO_GEMFIRE_VERSION_MAP.containsKey(gossipVersion)) {
+      // Create a versioned stream to remember sender's GemFire version
+      versionOrdinal = (short) GOSSIP_TO_GEMFIRE_VERSION_MAP.get(gossipVersion);
+
+      if (Version.GFE_71.compareTo(versionOrdinal) <= 0) {
+        // Recent versions of TcpClient will send the version ordinal
+        versionOrdinal = input.readShort();
+      }
+
+      if (log.isDebugEnabled() && versionOrdinal != Version.CURRENT_ORDINAL) {
+        log.debug("Locator reading request from " + socket.getInetAddress() + " with version "
+            + Version.fromOrdinal(versionOrdinal, false));
+      }
+      input = new VersionedDataInputStream(input, Version.fromOrdinal(versionOrdinal, false));
+      request = DataSerializer.readObject(input);
+      if (log.isDebugEnabled()) {
+        log.debug("Locator received request " + request + " from " + socket.getInetAddress());
+      }
+      if (request instanceof ShutdownRequest) {
+        shuttingDown = true;
+        // Don't call shutdown from within the worker thread, see java bug #6576792.
+        // Closing the socket will cause our acceptor thread to shutdown the executor
+        this.serverSocketPortAtClose = srv_sock.getLocalPort();
+        srv_sock.close();
+        response = new ShutdownResponse();
+      } else if (request instanceof InfoRequest) {
+        response = handleInfoRequest(request);
+      } else if (request instanceof VersionRequest) {
+        response = handleVersionRequest(request);
+      } else {
+        response = handler.processRequest(request);
+      }
+
+      handler.endRequest(request, startTime);
+
+      startTime = DistributionStats.getStatTime();
+      if (response != null) {
+        DataOutputStream output = new DataOutputStream(socket.getOutputStream());
+        if (versionOrdinal != Version.CURRENT_ORDINAL) {
+          output =
+              new VersionedDataOutputStream(output, Version.fromOrdinal(versionOrdinal, false));
+        }
+        DataSerializer.writeObject(response, output);
+        output.flush();
+      }
+
+      handler.endResponse(request, startTime);
+    } else {
+      // Close the socket. We can not accept requests from a newer version
+      rejectUnknownProtocolConnection(socket, gossipVersion);
+    }
+  }
+
+  private void rejectUnknownProtocolConnection(Socket socket, int gossipVersion) {
     try {
       socket.getOutputStream().write("unknown protocol version".getBytes());
       socket.getOutputStream().flush();
+      socket.close();
     } catch (IOException e) {
       log.debug("exception in sending reply to process using unknown protocol " + gossipVersion, e);
     }
-    socket.close();
   }
 
-  private int readGossipVersion(Socket sock, DataInputStream input) throws Exception {
-    // read the first byte & check for an improperly configured client pool trying
-    // to contact a cache server
-    int firstByte = input.readUnsignedByte();
-    if (CommunicationMode.isValidMode(firstByte)) {
-      sock.getOutputStream().write(HandShake.REPLY_SERVER_IS_LOCATOR);
-      throw new Exception("Improperly configured client detected - use addPoolLocator to "
-          + "configure its locators instead of addPoolServer.");
+  private void handleProtobufConnection(Socket socket, DataInputStream input) throws Exception {
+    if (!Boolean.getBoolean("geode.feature-protobuf-protocol")) {
+      log.warn("Incoming protobuf connection, but protobuf not enabled on this locator.");
+      socket.close();
+      return;
     }
 
-    int gossipVersion = firstByte;
-    for (int i = 0; i < 3; i++) {
-      gossipVersion = (gossipVersion << 8) + (0xff & input.readUnsignedByte());
+    try {
+      ClientProtocolService clientProtocolService = clientProtocolServiceLoader.lookupService();
+      clientProtocolService.initializeStatistics("LocatorStats",
+          internalLocator.getDistributedSystem());
+      try (ClientProtocolProcessor pipeline =
+          clientProtocolService.createProcessorForLocator(internalLocator)) {
+        pipeline.processMessage(input, socket.getOutputStream());
+      } catch (IncompatibleVersionException e) {
+        // should not happen on the locator as there is no handshake.
+        log.error("Unexpected exception in client message processing", e);
+      }
+    } catch (ServiceLoadingFailureException e) {
+      log.error("There was an error looking up the client protocol service", e);
+      socket.close();
+      throw new IOException("There was an error looking up the client protocol service", e);
+    } catch (ServiceVersionNotFoundException e) {
+      log.error("Unable to find service matching the client protocol version byte", e);
+      socket.close();
+      throw new IOException("Unable to find service matching the client protocol version byte", e);
     }
-    return gossipVersion;
   }
 
   protected Object handleInfoRequest(Object request) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolServiceLoader.java b/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolServiceLoader.java
index 4b66062..bf85bbc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolServiceLoader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolServiceLoader.java
@@ -39,7 +39,7 @@ public class ClientProtocolServiceLoader {
     return resultList;
   }
 
-  public ClientProtocolService lookupService(int protocolVersion) {
+  public ClientProtocolService lookupService() {
     if (clientProtocolServices.isEmpty()) {
       throw new ServiceLoadingFailureException(
           "There is no ClientProtocolService implementation found in JVM");
@@ -50,10 +50,6 @@ public class ClientProtocolServiceLoader {
           "There is more than one ClientProtocolService implementation found in JVM; aborting");
     }
     ClientProtocolService clientProtocolService = clientProtocolServices.get(0);
-    if (clientProtocolService.getServiceProtocolVersion() != protocolVersion) {
-      throw new ServiceVersionNotFoundException(
-          "The ClientProtocolService doesn't match the requested version.");
-    }
     return clientProtocolService;
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CommunicationMode.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CommunicationMode.java
index 053a556..e57b850 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CommunicationMode.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CommunicationMode.java
@@ -23,6 +23,19 @@ package org.apache.geode.internal.cache.tier;
  */
 public enum CommunicationMode {
   /**
+   * The first byte of any locator connection will be the high order byte of its gossip version,
+   * which will always be 0. Communication modes should not collide with this value.
+   */
+  ReservedForGossip((byte) 0, "Locator gossip version"),
+  /**
+   * For the new client-server protocol.
+   *
+   * Protobuf handshake messages are specially constructed so that this value will match the first
+   * byte sent, allowing clients to start protobuf connections with a handshake instead of
+   * communication mode bytes.
+   */
+  ProtobufClientServerProtocol((byte) 10, "Protobuf client"),
+  /**
    * Byte meaning that the Socket is being used for 'client to server' communication.
    */
   ClientToServer((byte) 100, "client"),
@@ -56,11 +69,7 @@ public enum CommunicationMode {
    * Byte meaning that the Socket is being used for 'client to server' messages related to a client
    * queue (register interest, create cq, etc.).
    */
-  ClientToServerForQueue((byte) 107, "clientToServerForQueue"),
-  /**
-   * For the new client-server protocol, which ignores the usual handshake mechanism.
-   */
-  ProtobufClientServerProtocol((byte) 110, "Protobuf client");
+  ClientToServerForQueue((byte) 107, "clientToServerForQueue");
 
   /**
    * is this a client-initiated operations connection?
@@ -128,11 +137,13 @@ public enum CommunicationMode {
    * check the given mode to see if it is assigned to one of the enumeration's instances
    */
   public static boolean isValidMode(int mode) {
-    return 100 <= mode && mode <= 110;
+    return (100 <= mode && mode <= 107) || mode == 10;
   }
 
   public static CommunicationMode fromModeNumber(byte modeNumber) {
     switch (modeNumber) {
+      case 10:
+        return ProtobufClientServerProtocol;
       case 100:
         return ClientToServer;
       case 101:
@@ -149,8 +160,6 @@ public enum CommunicationMode {
         return UnsuccessfulServerToClient;
       case 107:
         return ClientToServerForQueue;
-      case 110:
-        return ProtobufClientServerProtocol;
       default:
         throw new IllegalArgumentException("unknown communications mode: " + modeNumber);
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
index 7070b37..59d4533 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
@@ -44,9 +44,9 @@ public class ServerConnectionFactory {
 
 
   private synchronized ClientProtocolService getClientProtocolService(
-      StatisticsFactory statisticsFactory, String serverName, int protocolVersion) {
+      StatisticsFactory statisticsFactory, String serverName) {
     if (clientProtocolService == null) {
-      clientProtocolService = clientProtocolServiceLoader.lookupService(protocolVersion);
+      clientProtocolService = clientProtocolServiceLoader.lookupService();
       clientProtocolService.initializeStatistics(serverName, statisticsFactory);
     }
     return clientProtocolService;
@@ -60,11 +60,9 @@ public class ServerConnectionFactory {
       if (!Boolean.getBoolean("geode.feature-protobuf-protocol")) {
         throw new IOException("Server received unknown communication mode: " + communicationMode);
       } else {
-        int protocolVersion = readProtocolVersionByte(socket);
         try {
           return createGenericProtocolServerConnection(socket, cache, helper, stats, hsTimeout,
-              socketBufferSize, communicationModeStr, communicationMode, acceptor, securityService,
-              protocolVersion);
+              socketBufferSize, communicationModeStr, communicationMode, acceptor, securityService);
         } catch (ServiceLoadingFailureException ex) {
           throw new IOException("Could not load protobuf client protocol", ex);
         } catch (ServiceVersionNotFoundException ex) {
@@ -77,16 +75,12 @@ public class ServerConnectionFactory {
     }
   }
 
-  private int readProtocolVersionByte(Socket socket) throws IOException {
-    return socket.getInputStream().read();
-  }
-
   private ServerConnection createGenericProtocolServerConnection(Socket socket, InternalCache cache,
       CachedRegionHelper helper, CacheServerStats stats, int hsTimeout, int socketBufferSize,
       String communicationModeStr, byte communicationMode, Acceptor acceptor,
-      SecurityService securityService, int protocolVersion) {
-    ClientProtocolService service = getClientProtocolService(cache.getDistributedSystem(),
-        acceptor.getServerName(), protocolVersion);
+      SecurityService securityService) {
+    ClientProtocolService service =
+        getClientProtocolService(cache.getDistributedSystem(), acceptor.getServerName());
 
     ClientProtocolProcessor processor = service.createProcessorForCache(cache, securityService);
 
diff --git a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionHandshakingStateProcessor.java b/geode-protobuf-messages/src/main/proto/handshake.proto
similarity index 50%
rename from geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionHandshakingStateProcessor.java
rename to geode-protobuf-messages/src/main/proto/handshake.proto
index f885bfc..b3eebf6 100644
--- a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionHandshakingStateProcessor.java
+++ b/geode-protobuf-messages/src/main/proto/handshake.proto
@@ -1,7 +1,7 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
  * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
  *
@@ -12,17 +12,26 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.apache.geode.internal.protocol.state;
 
-import org.apache.geode.internal.protocol.state.exception.ConnectionStateException;
+syntax = "proto3";
+package org.apache.geode.internal.protocol.protobuf;
 
-public interface ConnectionHandshakingStateProcessor extends ConnectionStateProcessor {
-  @Override
-  default ConnectionHandshakingStateProcessor allowHandshake() throws ConnectionStateException {
-    return this;
-  }
+enum MajorVersions {
+    INVALID_MAJOR_VERSION = 0;  // Protobuf requires 0 based enum
+    CURRENT_MAJOR_VERSION = 1;  // Initial message structure and handshake protocol
+}
+enum MinorVersions {
+    INVALID_MINOR_VERSION = 0;  // Protobuf requires 0 based enum
+    CURRENT_MINOR_VERSION = 1;  // Protobuf implementation at initial release
+}
+
+message NewConnectionHandshake {
+    fixed32 majorVersion = 1;
+    fixed32 minorVersion = 2;
+}
 
-  // This is called when a handshake operation succeeds to get the processor for the next connection
-  // state.
-  ConnectionStateProcessor handshakeSucceeded();
+message HandshakeAcknowledgement {
+    int32 serverMajorVersion = 1;
+    int32 serverMinorVersion = 2;
+    bool handshakePassed = 3;
 }
diff --git a/geode-protobuf-messages/src/main/proto/v1/basicTypes.proto b/geode-protobuf-messages/src/main/proto/v1/basicTypes.proto
index 4fd0c96..d6be91b 100644
--- a/geode-protobuf-messages/src/main/proto/v1/basicTypes.proto
+++ b/geode-protobuf-messages/src/main/proto/v1/basicTypes.proto
@@ -22,9 +22,6 @@
 syntax = "proto3";
 package org.apache.geode.internal.protocol.protobuf.v1;
 
-import "google/protobuf/any.proto";
-import "google/protobuf/empty.proto";
-
 message Entry {
     EncodedValue key = 1;
     EncodedValue value = 2;
diff --git a/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto b/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto
index 992c8d6..b744fa6 100644
--- a/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto
+++ b/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto
@@ -21,7 +21,6 @@
 syntax = "proto3";
 package org.apache.geode.internal.protocol.protobuf.v1;
 
-import "google/protobuf/any.proto";
 import "v1/region_API.proto";
 import "v1/locator_API.proto";
 import "v1/basicTypes.proto";
@@ -47,7 +46,6 @@ message Request {
         GetRegionRequest getRegionRequest = 42;
 
         AuthenticationRequest authenticationRequest = 100;
-        HandshakeRequest handshakeRequest = 101;
     }
 }
 
@@ -64,7 +62,6 @@ message Response {
         GetRegionResponse getRegionResponse = 42;
 
         AuthenticationResponse authenticationResponse = 100;
-        HandshakeResponse handshakeResponse = 101;
 
         ErrorResponse errorResponse = 1000;
     }
diff --git a/geode-protobuf-messages/src/main/proto/v1/connection_API.proto b/geode-protobuf-messages/src/main/proto/v1/connection_API.proto
index 176dc0d..7c4435e 100644
--- a/geode-protobuf-messages/src/main/proto/v1/connection_API.proto
+++ b/geode-protobuf-messages/src/main/proto/v1/connection_API.proto
@@ -16,26 +16,6 @@
 syntax = "proto3";
 package org.apache.geode.internal.protocol.protobuf.v1;
 
-enum MajorVersions {
-    INVALID_MAJOR_VERSION = 0;  // Protobuf requires 0 based enum
-    CURRENT_MAJOR_VERSION = 1;  // Initial message structure and handshake protocol
-}
-enum MinorVersions {
-    INVALID_MINOR_VERSION = 0;  // Protobuf requires 0 based enum
-    CURRENT_MINOR_VERSION = 1;  // Protobuf implementation at initial release
-}
-
-message HandshakeRequest {
-    int32 majorVersion = 1;
-    int32 minorVersion = 2;
-}
-
-message HandshakeResponse {
-    int32 serverMajorVersion = 1;
-    int32 serverMinorVersion = 2;
-    bool handshakePassed = 3;
-}
-
 message AuthenticationRequest {
     map<string,string> credentials = 1;
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufLocatorPipeline.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufLocatorPipeline.java
index d67897f..0a30ae4 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufLocatorPipeline.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufLocatorPipeline.java
@@ -18,12 +18,16 @@ package org.apache.geode.internal.protocol.protobuf.v1;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.PushbackInputStream;
 
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.cache.IncompatibleVersionException;
 import org.apache.geode.distributed.internal.InternalLocator;
 import org.apache.geode.internal.cache.client.protocol.ClientProtocolProcessor;
+import org.apache.geode.internal.cache.tier.CommunicationMode;
 import org.apache.geode.internal.protocol.MessageExecutionContext;
+import org.apache.geode.internal.protocol.protobuf.Handshake;
+import org.apache.geode.internal.protocol.protobuf.v1.operations.VersionValidator;
 import org.apache.geode.internal.protocol.state.ConnectionStateProcessor;
 import org.apache.geode.internal.protocol.state.NoSecurityConnectionStateProcessor;
 import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics;
@@ -34,6 +38,7 @@ public final class ProtobufLocatorPipeline implements ClientProtocolProcessor {
   private final InternalLocator locator;
   private final ProtobufStreamProcessor streamProcessor;
   private final ConnectionStateProcessor locatorConnectionState;
+  private final VersionValidator validator;
 
   ProtobufLocatorPipeline(ProtobufStreamProcessor protobufStreamProcessor,
       ProtocolClientStatistics statistics, InternalLocator locator) {
@@ -42,11 +47,13 @@ public final class ProtobufLocatorPipeline implements ClientProtocolProcessor {
     this.locator = locator;
     this.statistics.clientConnected();
     this.locatorConnectionState = new NoSecurityConnectionStateProcessor();
+    this.validator = new VersionValidator();
   }
 
   @Override
   public void processMessage(InputStream inputStream, OutputStream outputStream)
       throws IOException, IncompatibleVersionException {
+    handleHandshakeMessage(inputStream);
     streamProcessor.receiveMessage(inputStream, outputStream,
         new MessageExecutionContext(locator, statistics, locatorConnectionState));
   }
@@ -61,4 +68,20 @@ public final class ProtobufLocatorPipeline implements ClientProtocolProcessor {
     // All locator connections are closed after one message, so this is not used
     return false;
   }
+
+  private void handleHandshakeMessage(InputStream inputStream) throws IOException {
+    // Incoming connection had the first byte removed to determine communication mode, add that
+    // back before parsing.
+    PushbackInputStream handshakeStream = new PushbackInputStream(inputStream);
+    handshakeStream.unread(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+
+    Handshake.NewConnectionHandshake handshakeRequest =
+        Handshake.NewConnectionHandshake.parseDelimitedFrom(handshakeStream);
+    int majorVersion = handshakeRequest.getMajorVersion();
+    int minorVersion = handshakeRequest.getMinorVersion();
+    if (!validator.isValid(majorVersion, minorVersion)) {
+      throw new IOException(
+          "Invalid protobuf client version number: " + majorVersion + "." + minorVersion);
+    }
+  }
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufProtocolService.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufProtocolService.java
index 63660a9..23fc804 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufProtocolService.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufProtocolService.java
@@ -19,6 +19,7 @@ import org.apache.geode.cache.Cache;
 import org.apache.geode.distributed.internal.InternalLocator;
 import org.apache.geode.internal.cache.client.protocol.ClientProtocolProcessor;
 import org.apache.geode.internal.cache.client.protocol.ClientProtocolService;
+import org.apache.geode.internal.protocol.protobuf.Handshake;
 import org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatisticsImpl;
 import org.apache.geode.internal.protocol.statistics.NoOpStatistics;
 import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics;
@@ -62,6 +63,6 @@ public class ProtobufProtocolService implements ClientProtocolService {
 
   @Override
   public int getServiceProtocolVersion() {
-    return ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE;
+    return Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE;
   }
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java
index 543454f..be1c276 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java
@@ -61,6 +61,11 @@ public class ProtobufStreamProcessor implements ClientProtocolMessageHandler {
   private void processOneMessage(InputStream inputStream, OutputStream outputStream,
       MessageExecutionContext executionContext)
       throws InvalidProtocolMessageException, IOException {
+    if (executionContext.getConnectionStateProcessor().handleMessageIndependently(inputStream,
+        outputStream, executionContext)) {
+      return;
+    }
+
     ClientProtocol.Message message = protobufProtocolSerializer.deserialize(inputStream);
     if (message == null) {
       String errorMessage = "Tried to deserialize protobuf message at EOF";
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeHandler.java
new file mode 100644
index 0000000..0132196
--- /dev/null
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeHandler.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.protobuf.v1.operations;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.internal.protocol.protobuf.Handshake;
+import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics;
+
+public class HandshakeHandler {
+  private static final Logger logger = LogManager.getLogger();
+  private static final VersionValidator validator = new VersionValidator();
+
+  public static boolean handleHandshake(InputStream inputStream, OutputStream outputStream,
+      ProtocolClientStatistics statistics) throws IOException {
+    Handshake.NewConnectionHandshake handshakeRequest =
+        Handshake.NewConnectionHandshake.parseDelimitedFrom(inputStream);
+
+    statistics.messageReceived(handshakeRequest.getSerializedSize());
+
+    final boolean handshakeSucceeded =
+        validator.isValid(handshakeRequest.getMajorVersion(), handshakeRequest.getMinorVersion());
+
+    Handshake.HandshakeAcknowledgement handshakeResponse = Handshake.HandshakeAcknowledgement
+        .newBuilder().setServerMajorVersion(Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
+        .setServerMinorVersion(Handshake.MinorVersions.CURRENT_MINOR_VERSION_VALUE)
+        .setHandshakePassed(handshakeSucceeded).build();
+
+    handshakeResponse.writeDelimitedTo(outputStream);
+    statistics.messageSent(handshakeResponse.getSerializedSize());
+    if (!handshakeSucceeded) {
+      throw new IOException("Incompatible protobuf version.");
+    }
+
+    return handshakeSucceeded;
+  }
+}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeRequestOperationHandler.java
deleted file mode 100644
index 97338e6..0000000
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeRequestOperationHandler.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.protocol.protobuf.v1.operations;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.internal.exception.InvalidExecutionContextException;
-import org.apache.geode.internal.protocol.Failure;
-import org.apache.geode.internal.protocol.MessageExecutionContext;
-import org.apache.geode.internal.protocol.Result;
-import org.apache.geode.internal.protocol.Success;
-import org.apache.geode.internal.protocol.operations.OperationHandler;
-import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
-import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI;
-import org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufResponseUtilities;
-import org.apache.geode.internal.protocol.serialization.SerializationService;
-import org.apache.geode.internal.protocol.state.ConnectionHandshakingStateProcessor;
-import org.apache.geode.internal.protocol.state.ConnectionStateProcessor;
-import org.apache.geode.internal.protocol.state.ConnectionTerminatingStateProcessor;
-import org.apache.geode.internal.protocol.state.exception.ConnectionStateException;
-
-public class HandshakeRequestOperationHandler implements
-    OperationHandler<ConnectionAPI.HandshakeRequest, ConnectionAPI.HandshakeResponse, ClientProtocol.ErrorResponse> {
-  private static final Logger logger = LogManager.getLogger();
-  private final VersionValidator validator = new VersionValidator();
-
-  @Override
-  public Result<ConnectionAPI.HandshakeResponse, ClientProtocol.ErrorResponse> process(
-      SerializationService serializationService, ConnectionAPI.HandshakeRequest request,
-      MessageExecutionContext messageExecutionContext)
-      throws InvalidExecutionContextException, ConnectionStateException {
-    ConnectionHandshakingStateProcessor stateProcessor;
-
-    // If handshake not allowed by this state this will throw a ConnectionStateException
-    stateProcessor = messageExecutionContext.getConnectionStateProcessor().allowHandshake();
-
-    final boolean handshakeSucceeded =
-        validator.isValid(request.getMajorVersion(), request.getMinorVersion());
-    if (handshakeSucceeded) {
-      ConnectionStateProcessor nextStateProcessor = stateProcessor.handshakeSucceeded();
-      messageExecutionContext.setConnectionStateProcessor(nextStateProcessor);
-    } else {
-      messageExecutionContext
-          .setConnectionStateProcessor(new ConnectionTerminatingStateProcessor());
-    }
-
-    return Success.of(ConnectionAPI.HandshakeResponse.newBuilder()
-        .setServerMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
-        .setServerMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)
-        .setHandshakePassed(handshakeSucceeded).build());
-  }
-}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/VersionValidator.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/VersionValidator.java
index 86eea86..9d6065b 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/VersionValidator.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/VersionValidator.java
@@ -14,15 +14,15 @@
  */
 package org.apache.geode.internal.protocol.protobuf.v1.operations;
 
-import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI;
+import org.apache.geode.internal.protocol.protobuf.Handshake;
 
 public class VersionValidator {
   private int majorVersion;
   private int minorVersion;
 
   public VersionValidator() {
-    this(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
-        ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
+    this(Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
+        Handshake.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
   }
 
   VersionValidator(int majorVersion, int minorVersion) {
@@ -31,9 +31,9 @@ public class VersionValidator {
   }
 
   public boolean isValid(int majorVersion, int minorVersion) {
-    if (majorVersion != ConnectionAPI.MajorVersions.INVALID_MAJOR_VERSION_VALUE
+    if (majorVersion != Handshake.MajorVersions.INVALID_MAJOR_VERSION_VALUE
         && majorVersion == this.majorVersion) {
-      if (minorVersion != ConnectionAPI.MinorVersions.INVALID_MINOR_VERSION_VALUE
+      if (minorVersion != Handshake.MinorVersions.INVALID_MINOR_VERSION_VALUE
           && minorVersion <= this.minorVersion) {
         return true;
       }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
index 8109e6f..065bd5d 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
@@ -27,7 +27,6 @@ import org.apache.geode.internal.protocol.protobuf.v1.operations.GetAvailableSer
 import org.apache.geode.internal.protocol.protobuf.v1.operations.GetRegionNamesRequestOperationHandler;
 import org.apache.geode.internal.protocol.protobuf.v1.operations.GetRegionRequestOperationHandler;
 import org.apache.geode.internal.protocol.protobuf.v1.operations.GetRequestOperationHandler;
-import org.apache.geode.internal.protocol.protobuf.v1.operations.HandshakeRequestOperationHandler;
 import org.apache.geode.internal.protocol.protobuf.v1.operations.PutAllRequestOperationHandler;
 import org.apache.geode.internal.protocol.protobuf.v1.operations.PutRequestOperationHandler;
 import org.apache.geode.internal.protocol.protobuf.v1.operations.RemoveRequestOperationHandler;
@@ -110,12 +109,5 @@ public class ProtobufOperationContextRegistry {
             opsResp -> ClientProtocol.Response.newBuilder().setGetAvailableServersResponse(opsResp),
             new ResourcePermission(ResourcePermission.Resource.CLUSTER,
                 ResourcePermission.Operation.READ)));
-
-    operationContexts.put(RequestAPICase.HANDSHAKEREQUEST,
-        new ProtobufOperationContext<>(ClientProtocol.Request::getHandshakeRequest,
-            new HandshakeRequestOperationHandler(),
-            opsResp -> ClientProtocol.Response.newBuilder().setHandshakeResponse(opsResp),
-            new ResourcePermission(ResourcePermission.Resource.DATA,
-                ResourcePermission.Operation.READ)));
   }
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionHandshakeStateProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionHandshakeStateProcessor.java
index 58fa33a..ec39ec9 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionHandshakeStateProcessor.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionHandshakeStateProcessor.java
@@ -14,19 +14,23 @@
  */
 package org.apache.geode.internal.protocol.protobuf.v1.state;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PushbackInputStream;
+
+import org.apache.geode.internal.cache.tier.CommunicationMode;
 import org.apache.geode.internal.protocol.MessageExecutionContext;
 import org.apache.geode.internal.protocol.OperationContext;
 import org.apache.geode.internal.protocol.ProtocolErrorCode;
-import org.apache.geode.internal.protocol.protobuf.v1.operations.HandshakeRequestOperationHandler;
-import org.apache.geode.internal.protocol.state.ConnectionHandshakingStateProcessor;
+import org.apache.geode.internal.protocol.protobuf.v1.operations.HandshakeHandler;
 import org.apache.geode.internal.protocol.state.ConnectionStateProcessor;
 import org.apache.geode.internal.protocol.state.LegacySecurityConnectionStateProcessor;
 import org.apache.geode.internal.protocol.state.NoSecurityConnectionStateProcessor;
 import org.apache.geode.internal.protocol.state.exception.ConnectionStateException;
 import org.apache.geode.internal.security.SecurityService;
 
-public class ProtobufConnectionHandshakeStateProcessor
-    implements ConnectionHandshakingStateProcessor {
+public class ProtobufConnectionHandshakeStateProcessor implements ConnectionStateProcessor {
   private final SecurityService securityService;
 
   public ProtobufConnectionHandshakeStateProcessor(SecurityService securityService) {
@@ -36,14 +40,11 @@ public class ProtobufConnectionHandshakeStateProcessor
   @Override
   public void validateOperation(MessageExecutionContext messageContext,
       OperationContext operationContext) throws ConnectionStateException {
-    if (!(operationContext.getOperationHandler() instanceof HandshakeRequestOperationHandler)) {
-      throw new ConnectionStateException(ProtocolErrorCode.HANDSHAKE_REQUIRED,
-          "Protobuf handshake must be completed before any other operation.");
-    }
+    throw new ConnectionStateException(ProtocolErrorCode.GENERIC_FAILURE,
+        "Connection processing should never be asked to validate an operation");
   }
 
-  @Override
-  public ConnectionStateProcessor handshakeSucceeded() {
+  private ConnectionStateProcessor nextConnectionState() {
     if (securityService.isIntegratedSecurity()) {
       return new ConnectionShiroAuthenticatingStateProcessor(securityService);
     } else if (securityService.isPeerSecurityRequired()
@@ -54,4 +55,19 @@ public class ProtobufConnectionHandshakeStateProcessor
       return new NoSecurityConnectionStateProcessor();
     }
   }
+
+  @Override
+  public boolean handleMessageIndependently(InputStream inputStream, OutputStream outputStream,
+      MessageExecutionContext executionContext) throws IOException {
+    // inputStream will have had the first byte stripped off to determine communication mode, add
+    // that byte back before processing message
+    PushbackInputStream messageStream = new PushbackInputStream(inputStream);
+    messageStream.unread(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+
+    if (HandshakeHandler.handleHandshake(messageStream, outputStream,
+        executionContext.getStatistics())) {
+      executionContext.setConnectionStateProcessor(nextConnectionState());
+    }
+    return true;
+  }
 }
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthenticationIntegrationTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthenticationIntegrationTest.java
index c3b6c73..f3f68f4 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthenticationIntegrationTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthenticationIntegrationTest.java
@@ -87,19 +87,10 @@ public class AuthenticationIntegrationTest {
     Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
     outputStream = socket.getOutputStream();
     inputStream = socket.getInputStream();
-    outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-    outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
 
     protobufProtocolSerializer = new ProtobufProtocolSerializer();
 
-    ClientProtocol.Message.newBuilder()
-        .setRequest(ClientProtocol.Request.newBuilder()
-            .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
-                .setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
-                .setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)))
-        .build().writeDelimitedTo(outputStream);
-    ClientProtocol.Message handshakeResponse = protobufProtocolSerializer.deserialize(inputStream);
-    assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed());
+    MessageUtil.performAndVerifyHandshake(socket);
   }
 
   private static class SimpleSecurityManager implements SecurityManager {
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthorizationIntegrationTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthorizationIntegrationTest.java
index c78a75d..cc69333 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthorizationIntegrationTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthorizationIntegrationTest.java
@@ -107,23 +107,13 @@ public class AuthorizationIntegrationTest {
     Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
     outputStream = socket.getOutputStream();
     inputStream = socket.getInputStream();
-    outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-    outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
 
     serializationService = new ProtobufSerializationService();
     protobufProtocolSerializer = new ProtobufProtocolSerializer();
 
     when(mockSecurityManager.authorize(same(securityPrincipal), any())).thenReturn(false);
 
-    ClientProtocol.Message.newBuilder()
-        .setRequest(ClientProtocol.Request.newBuilder()
-            .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
-                .setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
-                .setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)))
-        .build().writeDelimitedTo(outputStream);
-    ClientProtocol.Message handshakeResponse =
-        ClientProtocol.Message.parseDelimitedFrom(inputStream);;
-    assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed());
+    MessageUtil.performAndVerifyHandshake(socket);
 
     ClientProtocol.Message authenticationRequest = ClientProtocol.Message.newBuilder()
         .setRequest(ClientProtocol.Request.newBuilder()
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/HandshakeIntegrationTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/HandshakeIntegrationTest.java
index de3038f..0d3fd24 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/HandshakeIntegrationTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/HandshakeIntegrationTest.java
@@ -18,6 +18,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -40,8 +41,10 @@ import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.distributed.ConfigurationProperties;
 import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.tier.CommunicationMode;
-import org.apache.geode.internal.protocol.ProtocolErrorCode;
+import org.apache.geode.internal.protocol.protobuf.Handshake;
 import org.apache.geode.internal.protocol.protobuf.v1.serializer.ProtobufProtocolSerializer;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 
@@ -96,48 +99,18 @@ public class HandshakeIntegrationTest {
 
   @Test
   public void testNormalHandshakeSucceeds() throws Exception {
-    outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-    outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
-
-    ClientProtocol.Message.newBuilder()
-        .setRequest(ClientProtocol.Request.newBuilder()
-            .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
-                .setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
-                .setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)))
-        .build().writeDelimitedTo(outputStream);
-    ClientProtocol.Message handshakeResponse = protobufProtocolSerializer.deserialize(inputStream);
-    assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed());
+    MessageUtil.performAndVerifyHandshake(socket);
   }
 
   @Test
   public void testInvalidMajorVersionBreaksConnection() throws Exception {
-    outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-    outputStream.write(ConnectionAPI.MajorVersions.INVALID_MAJOR_VERSION_VALUE);
+    Handshake.NewConnectionHandshake.newBuilder().setMajorVersion(2000)
+        .setMinorVersion(Handshake.MinorVersions.CURRENT_MINOR_VERSION_VALUE).build()
+        .writeDelimitedTo(socket.getOutputStream());
 
-    // Verify that connection is closed
-    Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
-      try {
-        assertEquals(-1, socket.getInputStream().read()); // EOF implies disconnected.
-        return true;
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    });
-  }
-
-  @Test
-  public void testInvalidMinorVersionBreaksConnectionAfterResponse() throws Exception {
-    outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-    outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
-
-    ClientProtocol.Message.newBuilder()
-        .setRequest(ClientProtocol.Request.newBuilder()
-            .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
-                .setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
-                .setMinorVersion(ConnectionAPI.MinorVersions.INVALID_MINOR_VERSION_VALUE)))
-        .build().writeDelimitedTo(outputStream);
-    ClientProtocol.Message handshakeResponse = protobufProtocolSerializer.deserialize(inputStream);
-    assertFalse(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed());
+    Handshake.HandshakeAcknowledgement handshakeResponse =
+        Handshake.HandshakeAcknowledgement.parseDelimitedFrom(socket.getInputStream());
+    assertFalse(handshakeResponse.getHandshakePassed());
 
     // Verify that connection is closed
     Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
@@ -150,29 +123,15 @@ public class HandshakeIntegrationTest {
     });
   }
 
+  /**
+   * Protobuf seems to omit values that are set to their default (0). This ruins the serialization
+   * trick we use because the message size changes.
+   */
   @Test
-  public void testUnexpectedHandshakeFailsAndClosesConnection() throws Exception {
-    outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-    outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
-
-    ClientProtocol.Message.newBuilder()
-        .setRequest(ClientProtocol.Request.newBuilder()
-            .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
-                .setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
-                .setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)))
-        .build().writeDelimitedTo(outputStream);
-    ClientProtocol.Message handshakeResponse = protobufProtocolSerializer.deserialize(inputStream);
-    assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed());
-
-    ClientProtocol.Message.newBuilder()
-        .setRequest(ClientProtocol.Request.newBuilder()
-            .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
-                .setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
-                .setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)))
-        .build().writeDelimitedTo(outputStream);
-    ClientProtocol.Message failingHandshake = protobufProtocolSerializer.deserialize(inputStream);
-    assertEquals(ProtocolErrorCode.UNSUPPORTED_OPERATION.codeValue,
-        failingHandshake.getResponse().getErrorResponse().getError().getErrorCode());
+  public void testMissingMajorVersionBreaksConnection() throws Exception {
+    Handshake.NewConnectionHandshake.newBuilder()
+        .setMajorVersion(Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE).setMinorVersion(0)
+        .build().writeDelimitedTo(socket.getOutputStream());
 
     // Verify that connection is closed
     Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
@@ -180,7 +139,8 @@ public class HandshakeIntegrationTest {
         assertEquals(-1, socket.getInputStream().read()); // EOF implies disconnected.
         return true;
       } catch (IOException e) {
-        throw new RuntimeException(e);
+        // Ignore IOExceptions (sometimes socket reset exception is thrown)
+        return true;
       }
     });
   }
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/MessageUtil.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/MessageUtil.java
index 5a10b1f..c998f4a 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/MessageUtil.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/MessageUtil.java
@@ -14,6 +14,16 @@
  */
 package org.apache.geode.internal.protocol.protobuf.v1;
 
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+
+import com.google.protobuf.MessageLite;
+
+import org.apache.geode.internal.protocol.protobuf.Handshake;
 import org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufRequestUtilities;
 import org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufUtilities;
 import org.apache.geode.internal.protocol.serialization.SerializationService;
@@ -22,6 +32,24 @@ import org.apache.geode.internal.protocol.serialization.registry.exception.Codec
 
 public class MessageUtil {
 
+  public static void performAndVerifyHandshake(Socket socket) throws IOException {
+    sendHandshake(socket);
+    verifyHandshakeSuccess(socket);
+  }
+
+  public static void verifyHandshakeSuccess(Socket socket) throws IOException {
+    Handshake.HandshakeAcknowledgement handshakeResponse =
+        Handshake.HandshakeAcknowledgement.parseDelimitedFrom(socket.getInputStream());
+    assertTrue(handshakeResponse.getHandshakePassed());
+  }
+
+  public static void sendHandshake(Socket socket) throws IOException {
+    Handshake.NewConnectionHandshake.newBuilder()
+        .setMajorVersion(Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
+        .setMinorVersion(Handshake.MinorVersions.CURRENT_MINOR_VERSION_VALUE).build()
+        .writeDelimitedTo(socket.getOutputStream());
+  }
+
   public static RegionAPI.GetRegionRequest makeGetRegionRequest(String requestRegion) {
     return RegionAPI.GetRegionRequest.newBuilder().setRegionName(requestRegion).build();
   }
@@ -68,4 +96,14 @@ public class MessageUtil {
   private static RegionAPI.GetRequest.Builder getGetRequestBuilder() {
     return RegionAPI.GetRequest.newBuilder();
   }
+
+  public static ByteArrayInputStream writeMessageDelimitedToInputStream(MessageLite message) {
+    try {
+      ByteArrayOutputStream output = new ByteArrayOutputStream();
+      message.writeDelimitedTo(output);
+      return new ByteArrayInputStream(output.toByteArray());
+    } catch (IOException e) {
+      throw new RuntimeException(e); // never happens.
+    }
+  }
 }
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheConnectionJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheConnectionJUnitTest.java
index bf1127a..7f6327d 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheConnectionJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheConnectionJUnitTest.java
@@ -143,8 +143,8 @@ public class CacheConnectionJUnitTest {
     }
     Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
     outputStream = socket.getOutputStream();
-    outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-    outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
+
+    MessageUtil.performAndVerifyHandshake(socket);
 
     serializationService = new ProtobufSerializationService();
   }
@@ -160,16 +160,6 @@ public class CacheConnectionJUnitTest {
   public void testBasicMessagesAndStats() throws Exception {
     ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
 
-    ClientProtocol.Message.newBuilder()
-        .setRequest(ClientProtocol.Request.newBuilder()
-            .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
-                .setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
-                .setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)))
-        .build().writeDelimitedTo(outputStream);
-    ClientProtocol.Message handshakeResponse =
-        ClientProtocol.Message.parseDelimitedFrom(socket.getInputStream());
-    assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed());
-
     ClientProtocol.Message putMessage =
         MessageUtil.makePutRequestMessage(serializationService, TEST_KEY, TEST_VALUE, TEST_REGION);
     protobufProtocolSerializer.serialize(putMessage, outputStream);
@@ -204,7 +194,7 @@ public class CacheConnectionJUnitTest {
     CacheServer cacheServer = cacheServers.stream().findFirst().get();
     AcceptorImpl acceptor = ((CacheServerImpl) cacheServer).getAcceptor();
 
-    Awaitility.await().atMost(30, TimeUnit.SECONDS)
+    Awaitility.await().atMost(5, TimeUnit.SECONDS)
         .until(() -> acceptor.getClientServerCnxCount() == 1);
 
     // make a request to the server
@@ -216,7 +206,7 @@ public class CacheConnectionJUnitTest {
     // make sure socket is still open
     assertFalse(socket.isClosed());
     socket.close();
-    Awaitility.await().atMost(30, TimeUnit.SECONDS)
+    Awaitility.await().atMost(5, TimeUnit.SECONDS)
         .until(() -> acceptor.getClientServerCnxCount() == 0);
   }
 
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheConnectionTimeoutJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheConnectionTimeoutJUnitTest.java
index b14ceb2..4adf37e 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheConnectionTimeoutJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheConnectionTimeoutJUnitTest.java
@@ -106,8 +106,7 @@ public class CacheConnectionTimeoutJUnitTest {
 
     Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
     outputStream = socket.getOutputStream();
-    outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-    outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
+    MessageUtil.performAndVerifyHandshake(socket);
 
     serializationService = new ProtobufSerializationService();
 
@@ -155,14 +154,6 @@ public class CacheConnectionTimeoutJUnitTest {
   public void testResponsiveClientsStaysConnected() throws Exception {
     ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
 
-    ClientProtocol.Message.newBuilder()
-        .setRequest(ClientProtocol.Request.newBuilder()
-            .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
-                .setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
-                .setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)))
-        .build().writeDelimitedTo(outputStream);
-    protobufProtocolSerializer.deserialize(socket.getInputStream());
-
     ClientProtocol.Message putMessage =
         MessageUtil.makePutRequestMessage(serializationService, TEST_KEY, TEST_VALUE, TEST_REGION);
 
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheMaxConnectionJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheMaxConnectionJUnitTest.java
index 43287e0..f9489bf 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheMaxConnectionJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheMaxConnectionJUnitTest.java
@@ -15,6 +15,7 @@
 
 package org.apache.geode.internal.protocol.protobuf.v1.acceptance;
 
+import static org.apache.geode.internal.protocol.protobuf.v1.MessageUtil.performAndVerifyHandshake;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -53,7 +54,6 @@ import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
 import org.apache.geode.internal.net.SocketCreatorFactory;
 import org.apache.geode.internal.protocol.exception.InvalidProtocolMessageException;
 import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
-import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI;
 import org.apache.geode.internal.protocol.protobuf.v1.MessageUtil;
 import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
 import org.apache.geode.internal.protocol.protobuf.v1.serializer.ProtobufProtocolSerializer;
@@ -102,9 +102,6 @@ public class CacheMaxConnectionJUnitTest {
 
     socket = new Socket("localhost", cacheServerPort);
     Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
-    OutputStream outputStream = socket.getOutputStream();
-    outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-    outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
 
     serializationService = new ProtobufSerializationService();
     protobufProtocolSerializer = new ProtobufProtocolSerializer();
@@ -189,18 +186,8 @@ public class CacheMaxConnectionJUnitTest {
 
           Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
           OutputStream outputStream = socket.getOutputStream();
-          outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-          outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
-
-          ClientProtocol.Message.newBuilder()
-              .setRequest(ClientProtocol.Request.newBuilder()
-                  .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
-                      .setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
-                      .setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)))
-              .build().writeDelimitedTo(outputStream);
-          ClientProtocol.Message handshakeResponse =
-              ClientProtocol.Message.parseDelimitedFrom(socket.getInputStream());
-          assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed());
+
+          performAndVerifyHandshake(socket);
 
           ClientProtocol.Message putMessage = MessageUtil
               .makePutRequestMessage(serializationService, TEST_KEY, TEST_VALUE, TEST_REGION);
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java
index b114a21..a828fa5 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java
@@ -138,18 +138,8 @@ public class CacheOperationsJUnitTest {
     }
     Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
     outputStream = socket.getOutputStream();
-    outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-    outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
-
-    ClientProtocol.Message.newBuilder()
-        .setRequest(ClientProtocol.Request.newBuilder()
-            .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
-                .setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
-                .setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)))
-        .build().writeDelimitedTo(outputStream);
-    ClientProtocol.Message handshakeResponse =
-        ClientProtocol.Message.parseDelimitedFrom(socket.getInputStream());
-    assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed());
+
+    MessageUtil.performAndVerifyHandshake(socket);
 
     serializationService = new ProtobufSerializationService();
   }
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/LocatorConnectionDUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/LocatorConnectionDUnitTest.java
index ac432af..fc52d33 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/LocatorConnectionDUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/LocatorConnectionDUnitTest.java
@@ -41,6 +41,7 @@ import org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStat
 import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
 import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI;
 import org.apache.geode.internal.protocol.protobuf.v1.LocatorAPI;
+import org.apache.geode.internal.protocol.protobuf.v1.MessageUtil;
 import org.apache.geode.internal.protocol.protobuf.v1.serializer.ProtobufProtocolSerializer;
 import org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufRequestUtilities;
 import org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufUtilities;
@@ -72,11 +73,7 @@ public class LocatorConnectionDUnitTest extends JUnit4CacheTestCase {
     Host host = Host.getHost(0);
     int locatorPort = DistributedTestUtils.getDUnitLocatorPort();
     Socket socket = new Socket(host.getHostName(), locatorPort);
-    DataOutputStream dataOutputStream = new DataOutputStream(socket.getOutputStream());
-    dataOutputStream.writeInt(0);
-    // Using the constant from AcceptorImpl to ensure that magic byte is the same
-    dataOutputStream.writeByte(ProtobufClientServerProtocol.getModeNumber());
-    dataOutputStream.writeByte(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
+    MessageUtil.sendHandshake(socket);
     return socket;
   }
 
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeHandlerJUnitTest.java
new file mode 100644
index 0000000..098b1a2
--- /dev/null
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeHandlerJUnitTest.java
@@ -0,0 +1,140 @@
+package org.apache.geode.internal.protocol.protobuf.v1.operations;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.shiro.subject.Subject;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.exception.InvalidExecutionContextException;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
+import org.apache.geode.internal.protocol.ProtocolErrorCode;
+import org.apache.geode.internal.protocol.Result;
+import org.apache.geode.internal.protocol.protobuf.Handshake;
+import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI;
+import org.apache.geode.internal.protocol.protobuf.v1.MessageUtil;
+import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
+import org.apache.geode.internal.protocol.protobuf.v1.state.ConnectionShiroAuthenticatingStateProcessor;
+import org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionHandshakeStateProcessor;
+import org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufUtilities;
+import org.apache.geode.internal.protocol.serialization.SerializationService;
+import org.apache.geode.internal.protocol.state.ConnectionShiroAuthorizingStateProcessor;
+import org.apache.geode.internal.protocol.state.NoSecurityConnectionStateProcessor;
+import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics;
+import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+@Category(UnitTest.class)
+public class HandshakeHandlerJUnitTest {
+  private static final int INVALID_MAJOR_VERSION = 67;
+  private static final int INVALID_MINOR_VERSION = 92347;
+
+  private HandshakeHandler handshakeHandler = new HandshakeHandler();
+
+  @Test
+  public void testCurrentVersionHandshakeSucceeds() throws Exception {
+    Handshake.NewConnectionHandshake handshakeRequest =
+        generateHandshakeRequest(Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
+            Handshake.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
+
+    ByteArrayInputStream inputStream =
+        MessageUtil.writeMessageDelimitedToInputStream(handshakeRequest);
+
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    assertTrue(handshakeHandler.handleHandshake(inputStream, outputStream,
+        mock(ProtocolClientStatistics.class)));
+
+    Handshake.HandshakeAcknowledgement handshakeResponse = Handshake.HandshakeAcknowledgement
+        .parseDelimitedFrom(new ByteArrayInputStream(outputStream.toByteArray()));
+    assertTrue(handshakeResponse.getHandshakePassed());
+    assertEquals(Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
+        handshakeResponse.getServerMajorVersion());
+    assertEquals(Handshake.MinorVersions.CURRENT_MINOR_VERSION_VALUE,
+        handshakeResponse.getServerMinorVersion());
+  }
+
+  @Test
+  public void testInvalidMajorVersionFails() throws Exception {
+    assertNotEquals(INVALID_MAJOR_VERSION, Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
+
+    Handshake.NewConnectionHandshake handshakeRequest = generateHandshakeRequest(
+        INVALID_MAJOR_VERSION, Handshake.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
+
+    verifyHandshakeFails(handshakeRequest);
+
+    // Also validate the protobuf INVALID_MAJOR_VERSION_VALUE constant fails
+    handshakeRequest = generateHandshakeRequest(Handshake.MajorVersions.INVALID_MAJOR_VERSION_VALUE,
+        Handshake.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
+    verifyHandshakeFails(handshakeRequest);
+  }
+
+  private void verifyHandshakeFails(Handshake.NewConnectionHandshake handshakeRequest)
+      throws Exception {
+    ByteArrayInputStream inputStream =
+        MessageUtil.writeMessageDelimitedToInputStream(handshakeRequest);
+
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+    try {
+      handshakeHandler.handleHandshake(inputStream, outputStream,
+          mock(ProtocolClientStatistics.class));
+      fail("Invalid handshake should throw IOException");
+    } catch (IOException e) {
+      // expected if handshake verification fails
+    }
+
+    Handshake.HandshakeAcknowledgement handshakeResponse = Handshake.HandshakeAcknowledgement
+        .parseDelimitedFrom(new ByteArrayInputStream(outputStream.toByteArray()));
+
+    assertFalse(handshakeResponse.getHandshakePassed());
+  }
+
+  @Test
+  public void testInvalidMinorVersionFails() throws Exception {
+    assertNotEquals(INVALID_MINOR_VERSION, Handshake.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
+
+    Handshake.NewConnectionHandshake handshakeRequest = generateHandshakeRequest(
+        Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE, INVALID_MINOR_VERSION);
+
+    verifyHandshakeFails(handshakeRequest);
+
+    // Also validate the protobuf INVALID_MINOR_VERSION_VALUE constant fails
+    handshakeRequest = generateHandshakeRequest(Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
+        Handshake.MinorVersions.INVALID_MINOR_VERSION_VALUE);
+    verifyHandshakeFails(handshakeRequest);
+  }
+
+  private Handshake.NewConnectionHandshake generateHandshakeRequest(int majorVersion,
+      int minorVersion) {
+    return Handshake.NewConnectionHandshake.newBuilder().setMajorVersion(majorVersion)
+        .setMinorVersion(minorVersion).build();
+  }
+}
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeRequestOperationHandlerJUnitTest.java
deleted file mode 100644
index 0baf9bb..0000000
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeRequestOperationHandlerJUnitTest.java
+++ /dev/null
@@ -1,191 +0,0 @@
-package org.apache.geode.internal.protocol.protobuf.v1.operations;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-
-import org.apache.shiro.subject.Subject;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.exception.InvalidExecutionContextException;
-import org.apache.geode.internal.protocol.Failure;
-import org.apache.geode.internal.protocol.MessageExecutionContext;
-import org.apache.geode.internal.protocol.ProtocolErrorCode;
-import org.apache.geode.internal.protocol.Result;
-import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
-import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI;
-import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
-import org.apache.geode.internal.protocol.protobuf.v1.state.ConnectionShiroAuthenticatingStateProcessor;
-import org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionHandshakeStateProcessor;
-import org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufResponseUtilities;
-import org.apache.geode.internal.protocol.serialization.SerializationService;
-import org.apache.geode.internal.protocol.state.ConnectionShiroAuthorizingStateProcessor;
-import org.apache.geode.internal.protocol.state.NoSecurityConnectionStateProcessor;
-import org.apache.geode.internal.protocol.state.exception.ConnectionStateException;
-import org.apache.geode.internal.security.SecurityService;
-import org.apache.geode.test.junit.categories.UnitTest;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-@Category(UnitTest.class)
-public class HandshakeRequestOperationHandlerJUnitTest {
-  private static final int INVALID_MAJOR_VERSION = 67;
-  private static final int INVALID_MINOR_VERSION = 92347;
-
-  private HandshakeRequestOperationHandler handshakeHandler =
-      new HandshakeRequestOperationHandler();
-  private SerializationService serializationService = new ProtobufSerializationService();
-  private ProtobufConnectionHandshakeStateProcessor handshakeStateProcessor;
-
-  @Before
-  public void Setup() {
-    handshakeStateProcessor =
-        new ProtobufConnectionHandshakeStateProcessor(mock(SecurityService.class));
-  }
-
-  @Test
-  public void testCurrentVersionHandshakeSucceeds() throws Exception {
-    ConnectionAPI.HandshakeRequest handshakeRequest =
-        generateHandshakeRequest(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
-            ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
-    MessageExecutionContext messageExecutionContext =
-        new MessageExecutionContext(mock(InternalCache.class), null, handshakeStateProcessor);
-    Result<ConnectionAPI.HandshakeResponse, ClientProtocol.ErrorResponse> result =
-        handshakeHandler.process(serializationService, handshakeRequest, messageExecutionContext);
-    ConnectionAPI.HandshakeResponse handshakeResponse = result.getMessage();
-    assertTrue(handshakeResponse.getHandshakePassed());
-    assertEquals(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
-        handshakeResponse.getServerMajorVersion());
-    assertEquals(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE,
-        handshakeResponse.getServerMinorVersion());
-  }
-
-  @Test
-  public void testInvalidMajorVersionFails() throws Exception {
-    assertNotEquals(INVALID_MAJOR_VERSION, ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
-
-    ConnectionAPI.HandshakeRequest handshakeRequest = generateHandshakeRequest(
-        INVALID_MAJOR_VERSION, ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
-    MessageExecutionContext messageExecutionContext =
-        new MessageExecutionContext(mock(InternalCache.class), null, handshakeStateProcessor);
-
-    verifyHandshakeFails(handshakeRequest, messageExecutionContext);
-  }
-
-  @Test
-  public void testInvalidMajorVersionProtocolConstantFails() throws Exception {
-    ConnectionAPI.HandshakeRequest handshakeRequest =
-        generateHandshakeRequest(ConnectionAPI.MajorVersions.INVALID_MAJOR_VERSION_VALUE,
-            ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
-    MessageExecutionContext messageExecutionContext =
-        new MessageExecutionContext(mock(InternalCache.class), null, handshakeStateProcessor);
-    verifyHandshakeFails(handshakeRequest, messageExecutionContext);
-  }
-
-  private void verifyHandshakeFails(ConnectionAPI.HandshakeRequest handshakeRequest,
-      MessageExecutionContext messageExecutionContext) throws Exception {
-    Result<ConnectionAPI.HandshakeResponse, ClientProtocol.ErrorResponse> result =
-        handshakeHandler.process(serializationService, handshakeRequest, messageExecutionContext);
-    ConnectionAPI.HandshakeResponse handshakeResponse = result.getMessage();
-    assertFalse(handshakeResponse.getHandshakePassed());
-  }
-
-  @Test
-  public void testInvalidMinorVersionFails() throws Exception {
-    assertNotEquals(INVALID_MINOR_VERSION, ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
-
-    ConnectionAPI.HandshakeRequest handshakeRequest = generateHandshakeRequest(
-        ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE, INVALID_MINOR_VERSION);
-    MessageExecutionContext messageExecutionContext =
-        new MessageExecutionContext(mock(InternalCache.class), null, handshakeStateProcessor);
-
-    verifyHandshakeFails(handshakeRequest, messageExecutionContext);
-  }
-
-  @Test
-  public void testInvalidMinorVersionProtocolConstantFails() throws Exception {
-    ConnectionAPI.HandshakeRequest handshakeRequest =
-        generateHandshakeRequest(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
-            ConnectionAPI.MinorVersions.INVALID_MINOR_VERSION_VALUE);
-    MessageExecutionContext messageExecutionContext =
-        new MessageExecutionContext(mock(InternalCache.class), null, handshakeStateProcessor);
-
-    verifyHandshakeFails(handshakeRequest, messageExecutionContext);
-  }
-
-  @Test
-  public void testNoSecurityStateFailsHandshake() throws Exception {
-    ConnectionAPI.HandshakeRequest handshakeRequest =
-        generateHandshakeRequest(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
-            ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
-    MessageExecutionContext messageExecutionContext = new MessageExecutionContext(
-        mock(InternalCache.class), null, new NoSecurityConnectionStateProcessor());
-
-    try {
-      handshakeHandler.process(serializationService, handshakeRequest, messageExecutionContext);
-      fail("Handshake in non-handshake state should throw exception");
-    } catch (ConnectionStateException ex) {
-      assertEquals(ProtocolErrorCode.UNSUPPORTED_OPERATION, ex.getErrorCode());
-    }
-  }
-
-  @Test
-  public void testAuthenticatingStateFailsHandshake() throws Exception {
-    ConnectionAPI.HandshakeRequest handshakeRequest =
-        generateHandshakeRequest(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
-            ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
-    MessageExecutionContext messageExecutionContext =
-        new MessageExecutionContext(mock(InternalCache.class), null,
-            new ConnectionShiroAuthenticatingStateProcessor(mock(SecurityService.class)));
-
-    try {
-      handshakeHandler.process(serializationService, handshakeRequest, messageExecutionContext);
-      fail("Handshake in non-handshake state should throw exception");
-    } catch (ConnectionStateException ex) {
-      assertEquals(ProtocolErrorCode.UNSUPPORTED_OPERATION, ex.getErrorCode());
-    }
-  }
-
-  @Test
-  public void testAuthorizingStateFailsHandshake() throws Exception {
-    ConnectionAPI.HandshakeRequest handshakeRequest =
-        generateHandshakeRequest(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE,
-            ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE);
-    MessageExecutionContext messageExecutionContext =
-        new MessageExecutionContext(mock(InternalCache.class), null,
-            new ConnectionShiroAuthorizingStateProcessor(mock(SecurityService.class),
-                mock(Subject.class)));
-
-    try {
-      handshakeHandler.process(serializationService, handshakeRequest, messageExecutionContext);
-      fail("Handshake in non-handshake state should throw exception");
-    } catch (ConnectionStateException ex) {
-      assertEquals(ProtocolErrorCode.UNSUPPORTED_OPERATION, ex.getErrorCode());
-    }
-  }
-
-  private ConnectionAPI.HandshakeRequest generateHandshakeRequest(int majorVersion,
-      int minorVersion) {
-    return ConnectionAPI.HandshakeRequest.newBuilder().setMajorVersion(majorVersion)
-        .setMinorVersion(minorVersion).build();
-  }
-}
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/VersionValidatorJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/VersionValidatorJUnitTest.java
index b59d154..e83e6e0 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/VersionValidatorJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/VersionValidatorJUnitTest.java
@@ -20,7 +20,7 @@ import static org.junit.Assert.assertTrue;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI;
+import org.apache.geode.internal.protocol.protobuf.Handshake;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
@@ -33,11 +33,11 @@ public class VersionValidatorJUnitTest {
   @Test
   public void testInvalidVersions() throws Exception {
     assertFalse(
-        validator.isValid(MAJOR_VERSION, ConnectionAPI.MinorVersions.INVALID_MINOR_VERSION_VALUE));
+        validator.isValid(MAJOR_VERSION, Handshake.MinorVersions.INVALID_MINOR_VERSION_VALUE));
     assertFalse(
-        validator.isValid(ConnectionAPI.MajorVersions.INVALID_MAJOR_VERSION_VALUE, MINOR_VERSION));
-    assertFalse(validator.isValid(ConnectionAPI.MajorVersions.INVALID_MAJOR_VERSION_VALUE,
-        ConnectionAPI.MinorVersions.INVALID_MINOR_VERSION_VALUE));
+        validator.isValid(Handshake.MajorVersions.INVALID_MAJOR_VERSION_VALUE, MINOR_VERSION));
+    assertFalse(validator.isValid(Handshake.MajorVersions.INVALID_MAJOR_VERSION_VALUE,
+        Handshake.MinorVersions.INVALID_MINOR_VERSION_VALUE));
   }
 
   @Test

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <commits@geode.apache.org>'].

Mime
View raw message