Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8993C200D54 for ; Fri, 8 Dec 2017 18:52:35 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 87EDF160C0D; Fri, 8 Dec 2017 17:52:35 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 339E4160BFD for ; Fri, 8 Dec 2017 18:52:33 +0100 (CET) Received: (qmail 28149 invoked by uid 500); 8 Dec 2017 17:52:32 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 28140 invoked by uid 99); 8 Dec 2017 17:52:32 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Dec 2017 17:52:32 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 2756185792; Fri, 8 Dec 2017 17:52:30 +0000 (UTC) Date: Fri, 08 Dec 2017 17:52:30 +0000 To: "commits@geode.apache.org" Subject: [geode] branch develop updated: GEODE-4059: Changing protobuf handshake to not need communication mode bytes MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <151275555070.21989.13891475681493684714@gitbox.apache.org> From: bschuchardt@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: geode X-Git-Refname: refs/heads/develop X-Git-Reftype: branch X-Git-Oldrev: ab9d2527cfb6f6a2c45fcd63c11cdde18fcf3301 X-Git-Newrev: 15bb387bd086447ec55b264966819e70d40f608d X-Git-Rev: 15bb387bd086447ec55b264966819e70d40f608d X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated archived-at: Fri, 08 Dec 2017 17:52:35 -0000 This is an automated email from the ASF dual-hosted git repository. bschuchardt pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git The following commit(s) were added to refs/heads/develop by this push: new 15bb387 GEODE-4059: Changing protobuf handshake to not need communication mode bytes 15bb387 is described below commit 15bb387bd086447ec55b264966819e70d40f608d Author: Bruce Schuchardt AuthorDate: Fri Dec 8 09:51:07 2017 -0800 GEODE-4059: Changing protobuf handshake to not need communication mode bytes This closes pull request #1137 Squashed commit of the following: commit d9f5144cd7f7ddcf42eb7eb5febed531cbf67362 Merge: 6fa9bf6c8 8fd707201 Author: Bruce Schuchardt Date: Fri Dec 8 09:34:55 2017 -0800 Merge branch 'feature/GEODE-4059' of https://github.com/WireBaron/geode into WireBaron-feature/GEODE-4059 commit 8fd707201f0e02b1fdf81c1596f93d3fd9ccb0cf Author: Brian Rowe Date: Thu Dec 7 14:53:28 2017 -0800 Removing extra handshake from a unit test. Signed-off-by: Galen O'Sullivan commit 7d07c6f5163d99b92a12ca856a1b397fda6e0eb4 Author: Galen O'Sullivan Date: Wed Dec 6 16:20:25 2017 -0800 GEODE-4059: Changing protobuf handshake to not need communication mode bytes - Moved handshake proto messages into top level handshake.proto - Changed protobuf communication mode to match leading byte of handshake - Updated handshake handler and tests to handle changes - Modified locator to accept protobuf communication mode in lieu of gossip Signed-off-by: Brian Rowe --- .../protocol/state/ConnectionStateProcessor.java | 17 +- .../distributed/internal/tcpserver/TcpServer.java | 202 +++++++++++---------- .../protocol/ClientProtocolServiceLoader.java | 6 +- .../internal/cache/tier/CommunicationMode.java | 25 ++- .../tier/sockets/ServerConnectionFactory.java | 18 +- .../src/main/proto/handshake.proto | 31 ++-- .../src/main/proto/v1/basicTypes.proto | 3 - .../src/main/proto/v1/clientProtocol.proto | 3 - .../src/main/proto/v1/connection_API.proto | 20 -- .../protobuf/v1/ProtobufLocatorPipeline.java | 23 +++ .../protobuf/v1/ProtobufProtocolService.java | 3 +- .../protobuf/v1/ProtobufStreamProcessor.java | 5 + .../protobuf/v1/operations/HandshakeHandler.java | 54 ++++++ .../HandshakeRequestOperationHandler.java | 65 ------- .../protobuf/v1/operations/VersionValidator.java | 10 +- .../registry/ProtobufOperationContextRegistry.java | 8 - .../ProtobufConnectionHandshakeStateProcessor.java | 36 +++- .../protobuf/v1/AuthenticationIntegrationTest.java | 11 +- .../protobuf/v1/AuthorizationIntegrationTest.java | 12 +- .../protobuf/v1/HandshakeIntegrationTest.java | 82 +++------ .../internal/protocol/protobuf/v1/MessageUtil.java | 38 ++++ .../v1/acceptance/CacheConnectionJUnitTest.java | 18 +- .../CacheConnectionTimeoutJUnitTest.java | 11 +- .../v1/acceptance/CacheMaxConnectionJUnitTest.java | 19 +- .../v1/acceptance/CacheOperationsJUnitTest.java | 14 +- .../v1/acceptance/LocatorConnectionDUnitTest.java | 7 +- .../v1/operations/HandshakeHandlerJUnitTest.java | 140 ++++++++++++++ .../HandshakeRequestOperationHandlerJUnitTest.java | 191 ------------------- .../v1/operations/VersionValidatorJUnitTest.java | 10 +- 29 files changed, 492 insertions(+), 590 deletions(-) diff --git a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionStateProcessor.java b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionStateProcessor.java index e0d18b3..b55524f 100644 --- a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionStateProcessor.java +++ b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionStateProcessor.java @@ -14,6 +14,10 @@ */ package org.apache.geode.internal.protocol.state; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + import org.apache.geode.internal.protocol.MessageExecutionContext; import org.apache.geode.internal.protocol.OperationContext; import org.apache.geode.internal.protocol.ProtocolErrorCode; @@ -48,15 +52,14 @@ public interface ConnectionStateProcessor { } /** - * This indicates whether this specific state processor is able to handle handshake requests. + * Allow the state processor to take over the entire processing of a given message. * - * @return specialized ConnectionHandshakingStateProcessor interface implementation which can move - * to a new state - * @throws ConnectionStateException if unable to handle handshakes in this state. + * @return - True if the message has been handled by the state processor, false to continue normal + * processing. */ - default ConnectionHandshakingStateProcessor allowHandshake() throws ConnectionStateException { - throw new ConnectionStateException(ProtocolErrorCode.UNSUPPORTED_OPERATION, - "Requested operation not allowed at this time"); + default boolean handleMessageIndependently(InputStream inputStream, OutputStream outputStream, + MessageExecutionContext executionContext) throws IOException { + return false; } /** diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java index aadbfb4..4e002d8 100755 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java @@ -44,6 +44,7 @@ import org.apache.geode.CancelException; import org.apache.geode.DataSerializer; import org.apache.geode.SystemFailure; import org.apache.geode.cache.IncompatibleVersionException; +import org.apache.geode.cache.UnsupportedVersionException; import org.apache.geode.distributed.internal.ClusterConfigurationService; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.DistributionConfigImpl; @@ -364,7 +365,6 @@ public class TcpServer { executor.execute(() -> { long startTime = DistributionStats.getStatTime(); DataInputStream input = null; - Object request, response; try { socket.setSoTimeout(READ_TIMEOUT); @@ -379,89 +379,20 @@ public class TcpServer { + (socket.getInetAddress().getHostAddress() + ":" + socket.getPort()), e); return; } - int gossipVersion = readGossipVersion(socket, input); - - short versionOrdinal; - if (gossipVersion == NON_GOSSIP_REQUEST_VERSION) { - if (input.readUnsignedByte() == PROTOBUF_CLIENT_SERVER_PROTOCOL - && Boolean.getBoolean("geode.feature-protobuf-protocol")) { - try { - int protocolVersion = input.readUnsignedByte(); - ClientProtocolService clientProtocolService = - clientProtocolServiceLoader.lookupService(protocolVersion); - clientProtocolService.initializeStatistics("LocatorStats", - internalLocator.getDistributedSystem()); - try (ClientProtocolProcessor pipeline = - clientProtocolService.createProcessorForLocator(internalLocator)) { - pipeline.processMessage(input, socket.getOutputStream()); - } catch (IncompatibleVersionException e) { - // should not happen on the locator as there is no handshake. - log.error("Unexpected exception in client message processing", e); - } - } catch (ServiceLoadingFailureException e) { - log.error("There was an error looking up the client protocol service", e); - socket.close(); - throw new IOException("There was an error looking up the client protocol service", e); - } catch (ServiceVersionNotFoundException e) { - log.error("Unable to find service matching the client protocol version byte", e); - socket.close(); - throw new IOException( - "Unable to find service matching the client protocol version byte", e); - } - } else { - rejectUnknownProtocolConnection(socket, gossipVersion); - } - } else if (gossipVersion <= getCurrentGossipVersion() - && GOSSIP_TO_GEMFIRE_VERSION_MAP.containsKey(gossipVersion)) { - // Create a versioned stream to remember sender's GemFire version - versionOrdinal = (short) GOSSIP_TO_GEMFIRE_VERSION_MAP.get(gossipVersion); - - if (Version.GFE_71.compareTo(versionOrdinal) <= 0) { - // Recent versions of TcpClient will send the version ordinal - versionOrdinal = input.readShort(); - } - - if (log.isDebugEnabled() && versionOrdinal != Version.CURRENT_ORDINAL) { - log.debug("Locator reading request from " + socket.getInetAddress() + " with version " - + Version.fromOrdinal(versionOrdinal, false)); - } - input = new VersionedDataInputStream(input, Version.fromOrdinal(versionOrdinal, false)); - request = DataSerializer.readObject(input); - if (log.isDebugEnabled()) { - log.debug("Locator received request " + request + " from " + socket.getInetAddress()); - } - if (request instanceof ShutdownRequest) { - shuttingDown = true; - // Don't call shutdown from within the worker thread, see java bug #6576792. - // Closing the socket will cause our acceptor thread to shutdown the executor - this.serverSocketPortAtClose = srv_sock.getLocalPort(); - srv_sock.close(); - response = new ShutdownResponse(); - } else if (request instanceof InfoRequest) { - response = handleInfoRequest(request); - } else if (request instanceof VersionRequest) { - response = handleVersionRequest(request); - } else { - response = handler.processRequest(request); - } - - handler.endRequest(request, startTime); - - startTime = DistributionStats.getStatTime(); - if (response != null) { - DataOutputStream output = new DataOutputStream(socket.getOutputStream()); - if (versionOrdinal != Version.CURRENT_ORDINAL) { - output = - new VersionedDataOutputStream(output, Version.fromOrdinal(versionOrdinal, false)); - } - DataSerializer.writeObject(response, output); - output.flush(); - } + // read the first byte & check for an improperly configured client pool trying + // to contact a cache server + int firstByte = input.readUnsignedByte(); + if (firstByte == CommunicationMode.ReservedForGossip.getModeNumber()) { + processOneConnection(socket, startTime, input); + } else if (firstByte == CommunicationMode.ProtobufClientServerProtocol.getModeNumber()) { + handleProtobufConnection(socket, input); + } else if (CommunicationMode.isValidMode(firstByte)) { + socket.getOutputStream().write(HandShake.REPLY_SERVER_IS_LOCATOR); + throw new Exception("Improperly configured client detected - use addPoolLocator to " + + "configure its locators instead of addPoolServer."); - handler.endResponse(request, startTime); } else { - // Close the socket. We can not accept requests from a newer version - rejectUnknownProtocolConnection(socket, gossipVersion); + rejectUnknownProtocolConnection(socket, firstByte); } } catch (EOFException ignore) { // client went away - ignore @@ -516,32 +447,109 @@ public class TcpServer { }); } - private void rejectUnknownProtocolConnection(Socket socket, int gossipVersion) - throws IOException { + private void processOneConnection(Socket socket, long startTime, DataInputStream input) + throws IOException, UnsupportedVersionException, ClassNotFoundException { + // At this point we've read the leading byte of the gossip version and found it to be 0, + // continue reading the next three bytes + int gossipVersion = 0; + for (int i = 0; i < 3; i++) { + gossipVersion = (gossipVersion << 8) + (0xff & input.readUnsignedByte()); + } + + Object request; + Object response; + short versionOrdinal; + if (gossipVersion <= getCurrentGossipVersion() + && GOSSIP_TO_GEMFIRE_VERSION_MAP.containsKey(gossipVersion)) { + // Create a versioned stream to remember sender's GemFire version + versionOrdinal = (short) GOSSIP_TO_GEMFIRE_VERSION_MAP.get(gossipVersion); + + if (Version.GFE_71.compareTo(versionOrdinal) <= 0) { + // Recent versions of TcpClient will send the version ordinal + versionOrdinal = input.readShort(); + } + + if (log.isDebugEnabled() && versionOrdinal != Version.CURRENT_ORDINAL) { + log.debug("Locator reading request from " + socket.getInetAddress() + " with version " + + Version.fromOrdinal(versionOrdinal, false)); + } + input = new VersionedDataInputStream(input, Version.fromOrdinal(versionOrdinal, false)); + request = DataSerializer.readObject(input); + if (log.isDebugEnabled()) { + log.debug("Locator received request " + request + " from " + socket.getInetAddress()); + } + if (request instanceof ShutdownRequest) { + shuttingDown = true; + // Don't call shutdown from within the worker thread, see java bug #6576792. + // Closing the socket will cause our acceptor thread to shutdown the executor + this.serverSocketPortAtClose = srv_sock.getLocalPort(); + srv_sock.close(); + response = new ShutdownResponse(); + } else if (request instanceof InfoRequest) { + response = handleInfoRequest(request); + } else if (request instanceof VersionRequest) { + response = handleVersionRequest(request); + } else { + response = handler.processRequest(request); + } + + handler.endRequest(request, startTime); + + startTime = DistributionStats.getStatTime(); + if (response != null) { + DataOutputStream output = new DataOutputStream(socket.getOutputStream()); + if (versionOrdinal != Version.CURRENT_ORDINAL) { + output = + new VersionedDataOutputStream(output, Version.fromOrdinal(versionOrdinal, false)); + } + DataSerializer.writeObject(response, output); + output.flush(); + } + + handler.endResponse(request, startTime); + } else { + // Close the socket. We can not accept requests from a newer version + rejectUnknownProtocolConnection(socket, gossipVersion); + } + } + + private void rejectUnknownProtocolConnection(Socket socket, int gossipVersion) { try { socket.getOutputStream().write("unknown protocol version".getBytes()); socket.getOutputStream().flush(); + socket.close(); } catch (IOException e) { log.debug("exception in sending reply to process using unknown protocol " + gossipVersion, e); } - socket.close(); } - private int readGossipVersion(Socket sock, DataInputStream input) throws Exception { - // read the first byte & check for an improperly configured client pool trying - // to contact a cache server - int firstByte = input.readUnsignedByte(); - if (CommunicationMode.isValidMode(firstByte)) { - sock.getOutputStream().write(HandShake.REPLY_SERVER_IS_LOCATOR); - throw new Exception("Improperly configured client detected - use addPoolLocator to " - + "configure its locators instead of addPoolServer."); + private void handleProtobufConnection(Socket socket, DataInputStream input) throws Exception { + if (!Boolean.getBoolean("geode.feature-protobuf-protocol")) { + log.warn("Incoming protobuf connection, but protobuf not enabled on this locator."); + socket.close(); + return; } - int gossipVersion = firstByte; - for (int i = 0; i < 3; i++) { - gossipVersion = (gossipVersion << 8) + (0xff & input.readUnsignedByte()); + try { + ClientProtocolService clientProtocolService = clientProtocolServiceLoader.lookupService(); + clientProtocolService.initializeStatistics("LocatorStats", + internalLocator.getDistributedSystem()); + try (ClientProtocolProcessor pipeline = + clientProtocolService.createProcessorForLocator(internalLocator)) { + pipeline.processMessage(input, socket.getOutputStream()); + } catch (IncompatibleVersionException e) { + // should not happen on the locator as there is no handshake. + log.error("Unexpected exception in client message processing", e); + } + } catch (ServiceLoadingFailureException e) { + log.error("There was an error looking up the client protocol service", e); + socket.close(); + throw new IOException("There was an error looking up the client protocol service", e); + } catch (ServiceVersionNotFoundException e) { + log.error("Unable to find service matching the client protocol version byte", e); + socket.close(); + throw new IOException("Unable to find service matching the client protocol version byte", e); } - return gossipVersion; } protected Object handleInfoRequest(Object request) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolServiceLoader.java b/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolServiceLoader.java index 4b66062..bf85bbc 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolServiceLoader.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolServiceLoader.java @@ -39,7 +39,7 @@ public class ClientProtocolServiceLoader { return resultList; } - public ClientProtocolService lookupService(int protocolVersion) { + public ClientProtocolService lookupService() { if (clientProtocolServices.isEmpty()) { throw new ServiceLoadingFailureException( "There is no ClientProtocolService implementation found in JVM"); @@ -50,10 +50,6 @@ public class ClientProtocolServiceLoader { "There is more than one ClientProtocolService implementation found in JVM; aborting"); } ClientProtocolService clientProtocolService = clientProtocolServices.get(0); - if (clientProtocolService.getServiceProtocolVersion() != protocolVersion) { - throw new ServiceVersionNotFoundException( - "The ClientProtocolService doesn't match the requested version."); - } return clientProtocolService; } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CommunicationMode.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CommunicationMode.java index 053a556..e57b850 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CommunicationMode.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CommunicationMode.java @@ -23,6 +23,19 @@ package org.apache.geode.internal.cache.tier; */ public enum CommunicationMode { /** + * The first byte of any locator connection will be the high order byte of its gossip version, + * which will always be 0. Communication modes should not collide with this value. + */ + ReservedForGossip((byte) 0, "Locator gossip version"), + /** + * For the new client-server protocol. + * + * Protobuf handshake messages are specially constructed so that this value will match the first + * byte sent, allowing clients to start protobuf connections with a handshake instead of + * communication mode bytes. + */ + ProtobufClientServerProtocol((byte) 10, "Protobuf client"), + /** * Byte meaning that the Socket is being used for 'client to server' communication. */ ClientToServer((byte) 100, "client"), @@ -56,11 +69,7 @@ public enum CommunicationMode { * Byte meaning that the Socket is being used for 'client to server' messages related to a client * queue (register interest, create cq, etc.). */ - ClientToServerForQueue((byte) 107, "clientToServerForQueue"), - /** - * For the new client-server protocol, which ignores the usual handshake mechanism. - */ - ProtobufClientServerProtocol((byte) 110, "Protobuf client"); + ClientToServerForQueue((byte) 107, "clientToServerForQueue"); /** * is this a client-initiated operations connection? @@ -128,11 +137,13 @@ public enum CommunicationMode { * check the given mode to see if it is assigned to one of the enumeration's instances */ public static boolean isValidMode(int mode) { - return 100 <= mode && mode <= 110; + return (100 <= mode && mode <= 107) || mode == 10; } public static CommunicationMode fromModeNumber(byte modeNumber) { switch (modeNumber) { + case 10: + return ProtobufClientServerProtocol; case 100: return ClientToServer; case 101: @@ -149,8 +160,6 @@ public enum CommunicationMode { return UnsuccessfulServerToClient; case 107: return ClientToServerForQueue; - case 110: - return ProtobufClientServerProtocol; default: throw new IllegalArgumentException("unknown communications mode: " + modeNumber); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java index 7070b37..59d4533 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java @@ -44,9 +44,9 @@ public class ServerConnectionFactory { private synchronized ClientProtocolService getClientProtocolService( - StatisticsFactory statisticsFactory, String serverName, int protocolVersion) { + StatisticsFactory statisticsFactory, String serverName) { if (clientProtocolService == null) { - clientProtocolService = clientProtocolServiceLoader.lookupService(protocolVersion); + clientProtocolService = clientProtocolServiceLoader.lookupService(); clientProtocolService.initializeStatistics(serverName, statisticsFactory); } return clientProtocolService; @@ -60,11 +60,9 @@ public class ServerConnectionFactory { if (!Boolean.getBoolean("geode.feature-protobuf-protocol")) { throw new IOException("Server received unknown communication mode: " + communicationMode); } else { - int protocolVersion = readProtocolVersionByte(socket); try { return createGenericProtocolServerConnection(socket, cache, helper, stats, hsTimeout, - socketBufferSize, communicationModeStr, communicationMode, acceptor, securityService, - protocolVersion); + socketBufferSize, communicationModeStr, communicationMode, acceptor, securityService); } catch (ServiceLoadingFailureException ex) { throw new IOException("Could not load protobuf client protocol", ex); } catch (ServiceVersionNotFoundException ex) { @@ -77,16 +75,12 @@ public class ServerConnectionFactory { } } - private int readProtocolVersionByte(Socket socket) throws IOException { - return socket.getInputStream().read(); - } - private ServerConnection createGenericProtocolServerConnection(Socket socket, InternalCache cache, CachedRegionHelper helper, CacheServerStats stats, int hsTimeout, int socketBufferSize, String communicationModeStr, byte communicationMode, Acceptor acceptor, - SecurityService securityService, int protocolVersion) { - ClientProtocolService service = getClientProtocolService(cache.getDistributedSystem(), - acceptor.getServerName(), protocolVersion); + SecurityService securityService) { + ClientProtocolService service = + getClientProtocolService(cache.getDistributedSystem(), acceptor.getServerName()); ClientProtocolProcessor processor = service.createProcessorForCache(cache, securityService); diff --git a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionHandshakingStateProcessor.java b/geode-protobuf-messages/src/main/proto/handshake.proto similarity index 50% rename from geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionHandshakingStateProcessor.java rename to geode-protobuf-messages/src/main/proto/handshake.proto index f885bfc..b3eebf6 100644 --- a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/state/ConnectionHandshakingStateProcessor.java +++ b/geode-protobuf-messages/src/main/proto/handshake.proto @@ -1,7 +1,7 @@ /* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. You may obtain a * copy of the License at * @@ -12,17 +12,26 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.apache.geode.internal.protocol.state; -import org.apache.geode.internal.protocol.state.exception.ConnectionStateException; +syntax = "proto3"; +package org.apache.geode.internal.protocol.protobuf; -public interface ConnectionHandshakingStateProcessor extends ConnectionStateProcessor { - @Override - default ConnectionHandshakingStateProcessor allowHandshake() throws ConnectionStateException { - return this; - } +enum MajorVersions { + INVALID_MAJOR_VERSION = 0; // Protobuf requires 0 based enum + CURRENT_MAJOR_VERSION = 1; // Initial message structure and handshake protocol +} +enum MinorVersions { + INVALID_MINOR_VERSION = 0; // Protobuf requires 0 based enum + CURRENT_MINOR_VERSION = 1; // Protobuf implementation at initial release +} + +message NewConnectionHandshake { + fixed32 majorVersion = 1; + fixed32 minorVersion = 2; +} - // This is called when a handshake operation succeeds to get the processor for the next connection - // state. - ConnectionStateProcessor handshakeSucceeded(); +message HandshakeAcknowledgement { + int32 serverMajorVersion = 1; + int32 serverMinorVersion = 2; + bool handshakePassed = 3; } diff --git a/geode-protobuf-messages/src/main/proto/v1/basicTypes.proto b/geode-protobuf-messages/src/main/proto/v1/basicTypes.proto index 4fd0c96..d6be91b 100644 --- a/geode-protobuf-messages/src/main/proto/v1/basicTypes.proto +++ b/geode-protobuf-messages/src/main/proto/v1/basicTypes.proto @@ -22,9 +22,6 @@ syntax = "proto3"; package org.apache.geode.internal.protocol.protobuf.v1; -import "google/protobuf/any.proto"; -import "google/protobuf/empty.proto"; - message Entry { EncodedValue key = 1; EncodedValue value = 2; diff --git a/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto b/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto index 992c8d6..b744fa6 100644 --- a/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto +++ b/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto @@ -21,7 +21,6 @@ syntax = "proto3"; package org.apache.geode.internal.protocol.protobuf.v1; -import "google/protobuf/any.proto"; import "v1/region_API.proto"; import "v1/locator_API.proto"; import "v1/basicTypes.proto"; @@ -47,7 +46,6 @@ message Request { GetRegionRequest getRegionRequest = 42; AuthenticationRequest authenticationRequest = 100; - HandshakeRequest handshakeRequest = 101; } } @@ -64,7 +62,6 @@ message Response { GetRegionResponse getRegionResponse = 42; AuthenticationResponse authenticationResponse = 100; - HandshakeResponse handshakeResponse = 101; ErrorResponse errorResponse = 1000; } diff --git a/geode-protobuf-messages/src/main/proto/v1/connection_API.proto b/geode-protobuf-messages/src/main/proto/v1/connection_API.proto index 176dc0d..7c4435e 100644 --- a/geode-protobuf-messages/src/main/proto/v1/connection_API.proto +++ b/geode-protobuf-messages/src/main/proto/v1/connection_API.proto @@ -16,26 +16,6 @@ syntax = "proto3"; package org.apache.geode.internal.protocol.protobuf.v1; -enum MajorVersions { - INVALID_MAJOR_VERSION = 0; // Protobuf requires 0 based enum - CURRENT_MAJOR_VERSION = 1; // Initial message structure and handshake protocol -} -enum MinorVersions { - INVALID_MINOR_VERSION = 0; // Protobuf requires 0 based enum - CURRENT_MINOR_VERSION = 1; // Protobuf implementation at initial release -} - -message HandshakeRequest { - int32 majorVersion = 1; - int32 minorVersion = 2; -} - -message HandshakeResponse { - int32 serverMajorVersion = 1; - int32 serverMinorVersion = 2; - bool handshakePassed = 3; -} - message AuthenticationRequest { map credentials = 1; } diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufLocatorPipeline.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufLocatorPipeline.java index d67897f..0a30ae4 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufLocatorPipeline.java +++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufLocatorPipeline.java @@ -18,12 +18,16 @@ package org.apache.geode.internal.protocol.protobuf.v1; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.PushbackInputStream; import org.apache.geode.annotations.Experimental; import org.apache.geode.cache.IncompatibleVersionException; import org.apache.geode.distributed.internal.InternalLocator; import org.apache.geode.internal.cache.client.protocol.ClientProtocolProcessor; +import org.apache.geode.internal.cache.tier.CommunicationMode; import org.apache.geode.internal.protocol.MessageExecutionContext; +import org.apache.geode.internal.protocol.protobuf.Handshake; +import org.apache.geode.internal.protocol.protobuf.v1.operations.VersionValidator; import org.apache.geode.internal.protocol.state.ConnectionStateProcessor; import org.apache.geode.internal.protocol.state.NoSecurityConnectionStateProcessor; import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics; @@ -34,6 +38,7 @@ public final class ProtobufLocatorPipeline implements ClientProtocolProcessor { private final InternalLocator locator; private final ProtobufStreamProcessor streamProcessor; private final ConnectionStateProcessor locatorConnectionState; + private final VersionValidator validator; ProtobufLocatorPipeline(ProtobufStreamProcessor protobufStreamProcessor, ProtocolClientStatistics statistics, InternalLocator locator) { @@ -42,11 +47,13 @@ public final class ProtobufLocatorPipeline implements ClientProtocolProcessor { this.locator = locator; this.statistics.clientConnected(); this.locatorConnectionState = new NoSecurityConnectionStateProcessor(); + this.validator = new VersionValidator(); } @Override public void processMessage(InputStream inputStream, OutputStream outputStream) throws IOException, IncompatibleVersionException { + handleHandshakeMessage(inputStream); streamProcessor.receiveMessage(inputStream, outputStream, new MessageExecutionContext(locator, statistics, locatorConnectionState)); } @@ -61,4 +68,20 @@ public final class ProtobufLocatorPipeline implements ClientProtocolProcessor { // All locator connections are closed after one message, so this is not used return false; } + + private void handleHandshakeMessage(InputStream inputStream) throws IOException { + // Incoming connection had the first byte removed to determine communication mode, add that + // back before parsing. + PushbackInputStream handshakeStream = new PushbackInputStream(inputStream); + handshakeStream.unread(CommunicationMode.ProtobufClientServerProtocol.getModeNumber()); + + Handshake.NewConnectionHandshake handshakeRequest = + Handshake.NewConnectionHandshake.parseDelimitedFrom(handshakeStream); + int majorVersion = handshakeRequest.getMajorVersion(); + int minorVersion = handshakeRequest.getMinorVersion(); + if (!validator.isValid(majorVersion, minorVersion)) { + throw new IOException( + "Invalid protobuf client version number: " + majorVersion + "." + minorVersion); + } + } } diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufProtocolService.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufProtocolService.java index 63660a9..23fc804 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufProtocolService.java +++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufProtocolService.java @@ -19,6 +19,7 @@ import org.apache.geode.cache.Cache; import org.apache.geode.distributed.internal.InternalLocator; import org.apache.geode.internal.cache.client.protocol.ClientProtocolProcessor; import org.apache.geode.internal.cache.client.protocol.ClientProtocolService; +import org.apache.geode.internal.protocol.protobuf.Handshake; import org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatisticsImpl; import org.apache.geode.internal.protocol.statistics.NoOpStatistics; import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics; @@ -62,6 +63,6 @@ public class ProtobufProtocolService implements ClientProtocolService { @Override public int getServiceProtocolVersion() { - return ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE; + return Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE; } } diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java index 543454f..be1c276 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java +++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java @@ -61,6 +61,11 @@ public class ProtobufStreamProcessor implements ClientProtocolMessageHandler { private void processOneMessage(InputStream inputStream, OutputStream outputStream, MessageExecutionContext executionContext) throws InvalidProtocolMessageException, IOException { + if (executionContext.getConnectionStateProcessor().handleMessageIndependently(inputStream, + outputStream, executionContext)) { + return; + } + ClientProtocol.Message message = protobufProtocolSerializer.deserialize(inputStream); if (message == null) { String errorMessage = "Tried to deserialize protobuf message at EOF"; diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeHandler.java new file mode 100644 index 0000000..0132196 --- /dev/null +++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeHandler.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.protocol.protobuf.v1.operations; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import org.apache.geode.internal.protocol.protobuf.Handshake; +import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics; + +public class HandshakeHandler { + private static final Logger logger = LogManager.getLogger(); + private static final VersionValidator validator = new VersionValidator(); + + public static boolean handleHandshake(InputStream inputStream, OutputStream outputStream, + ProtocolClientStatistics statistics) throws IOException { + Handshake.NewConnectionHandshake handshakeRequest = + Handshake.NewConnectionHandshake.parseDelimitedFrom(inputStream); + + statistics.messageReceived(handshakeRequest.getSerializedSize()); + + final boolean handshakeSucceeded = + validator.isValid(handshakeRequest.getMajorVersion(), handshakeRequest.getMinorVersion()); + + Handshake.HandshakeAcknowledgement handshakeResponse = Handshake.HandshakeAcknowledgement + .newBuilder().setServerMajorVersion(Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE) + .setServerMinorVersion(Handshake.MinorVersions.CURRENT_MINOR_VERSION_VALUE) + .setHandshakePassed(handshakeSucceeded).build(); + + handshakeResponse.writeDelimitedTo(outputStream); + statistics.messageSent(handshakeResponse.getSerializedSize()); + if (!handshakeSucceeded) { + throw new IOException("Incompatible protobuf version."); + } + + return handshakeSucceeded; + } +} diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeRequestOperationHandler.java deleted file mode 100644 index 97338e6..0000000 --- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeRequestOperationHandler.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.internal.protocol.protobuf.v1.operations; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import org.apache.geode.internal.exception.InvalidExecutionContextException; -import org.apache.geode.internal.protocol.Failure; -import org.apache.geode.internal.protocol.MessageExecutionContext; -import org.apache.geode.internal.protocol.Result; -import org.apache.geode.internal.protocol.Success; -import org.apache.geode.internal.protocol.operations.OperationHandler; -import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol; -import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI; -import org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufResponseUtilities; -import org.apache.geode.internal.protocol.serialization.SerializationService; -import org.apache.geode.internal.protocol.state.ConnectionHandshakingStateProcessor; -import org.apache.geode.internal.protocol.state.ConnectionStateProcessor; -import org.apache.geode.internal.protocol.state.ConnectionTerminatingStateProcessor; -import org.apache.geode.internal.protocol.state.exception.ConnectionStateException; - -public class HandshakeRequestOperationHandler implements - OperationHandler { - private static final Logger logger = LogManager.getLogger(); - private final VersionValidator validator = new VersionValidator(); - - @Override - public Result process( - SerializationService serializationService, ConnectionAPI.HandshakeRequest request, - MessageExecutionContext messageExecutionContext) - throws InvalidExecutionContextException, ConnectionStateException { - ConnectionHandshakingStateProcessor stateProcessor; - - // If handshake not allowed by this state this will throw a ConnectionStateException - stateProcessor = messageExecutionContext.getConnectionStateProcessor().allowHandshake(); - - final boolean handshakeSucceeded = - validator.isValid(request.getMajorVersion(), request.getMinorVersion()); - if (handshakeSucceeded) { - ConnectionStateProcessor nextStateProcessor = stateProcessor.handshakeSucceeded(); - messageExecutionContext.setConnectionStateProcessor(nextStateProcessor); - } else { - messageExecutionContext - .setConnectionStateProcessor(new ConnectionTerminatingStateProcessor()); - } - - return Success.of(ConnectionAPI.HandshakeResponse.newBuilder() - .setServerMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE) - .setServerMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE) - .setHandshakePassed(handshakeSucceeded).build()); - } -} diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/VersionValidator.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/VersionValidator.java index 86eea86..9d6065b 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/VersionValidator.java +++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/VersionValidator.java @@ -14,15 +14,15 @@ */ package org.apache.geode.internal.protocol.protobuf.v1.operations; -import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI; +import org.apache.geode.internal.protocol.protobuf.Handshake; public class VersionValidator { private int majorVersion; private int minorVersion; public VersionValidator() { - this(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE, - ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE); + this(Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE, + Handshake.MinorVersions.CURRENT_MINOR_VERSION_VALUE); } VersionValidator(int majorVersion, int minorVersion) { @@ -31,9 +31,9 @@ public class VersionValidator { } public boolean isValid(int majorVersion, int minorVersion) { - if (majorVersion != ConnectionAPI.MajorVersions.INVALID_MAJOR_VERSION_VALUE + if (majorVersion != Handshake.MajorVersions.INVALID_MAJOR_VERSION_VALUE && majorVersion == this.majorVersion) { - if (minorVersion != ConnectionAPI.MinorVersions.INVALID_MINOR_VERSION_VALUE + if (minorVersion != Handshake.MinorVersions.INVALID_MINOR_VERSION_VALUE && minorVersion <= this.minorVersion) { return true; } diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java index 8109e6f..065bd5d 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java +++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java @@ -27,7 +27,6 @@ import org.apache.geode.internal.protocol.protobuf.v1.operations.GetAvailableSer import org.apache.geode.internal.protocol.protobuf.v1.operations.GetRegionNamesRequestOperationHandler; import org.apache.geode.internal.protocol.protobuf.v1.operations.GetRegionRequestOperationHandler; import org.apache.geode.internal.protocol.protobuf.v1.operations.GetRequestOperationHandler; -import org.apache.geode.internal.protocol.protobuf.v1.operations.HandshakeRequestOperationHandler; import org.apache.geode.internal.protocol.protobuf.v1.operations.PutAllRequestOperationHandler; import org.apache.geode.internal.protocol.protobuf.v1.operations.PutRequestOperationHandler; import org.apache.geode.internal.protocol.protobuf.v1.operations.RemoveRequestOperationHandler; @@ -110,12 +109,5 @@ public class ProtobufOperationContextRegistry { opsResp -> ClientProtocol.Response.newBuilder().setGetAvailableServersResponse(opsResp), new ResourcePermission(ResourcePermission.Resource.CLUSTER, ResourcePermission.Operation.READ))); - - operationContexts.put(RequestAPICase.HANDSHAKEREQUEST, - new ProtobufOperationContext<>(ClientProtocol.Request::getHandshakeRequest, - new HandshakeRequestOperationHandler(), - opsResp -> ClientProtocol.Response.newBuilder().setHandshakeResponse(opsResp), - new ResourcePermission(ResourcePermission.Resource.DATA, - ResourcePermission.Operation.READ))); } } diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionHandshakeStateProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionHandshakeStateProcessor.java index 58fa33a..ec39ec9 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionHandshakeStateProcessor.java +++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionHandshakeStateProcessor.java @@ -14,19 +14,23 @@ */ package org.apache.geode.internal.protocol.protobuf.v1.state; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PushbackInputStream; + +import org.apache.geode.internal.cache.tier.CommunicationMode; import org.apache.geode.internal.protocol.MessageExecutionContext; import org.apache.geode.internal.protocol.OperationContext; import org.apache.geode.internal.protocol.ProtocolErrorCode; -import org.apache.geode.internal.protocol.protobuf.v1.operations.HandshakeRequestOperationHandler; -import org.apache.geode.internal.protocol.state.ConnectionHandshakingStateProcessor; +import org.apache.geode.internal.protocol.protobuf.v1.operations.HandshakeHandler; import org.apache.geode.internal.protocol.state.ConnectionStateProcessor; import org.apache.geode.internal.protocol.state.LegacySecurityConnectionStateProcessor; import org.apache.geode.internal.protocol.state.NoSecurityConnectionStateProcessor; import org.apache.geode.internal.protocol.state.exception.ConnectionStateException; import org.apache.geode.internal.security.SecurityService; -public class ProtobufConnectionHandshakeStateProcessor - implements ConnectionHandshakingStateProcessor { +public class ProtobufConnectionHandshakeStateProcessor implements ConnectionStateProcessor { private final SecurityService securityService; public ProtobufConnectionHandshakeStateProcessor(SecurityService securityService) { @@ -36,14 +40,11 @@ public class ProtobufConnectionHandshakeStateProcessor @Override public void validateOperation(MessageExecutionContext messageContext, OperationContext operationContext) throws ConnectionStateException { - if (!(operationContext.getOperationHandler() instanceof HandshakeRequestOperationHandler)) { - throw new ConnectionStateException(ProtocolErrorCode.HANDSHAKE_REQUIRED, - "Protobuf handshake must be completed before any other operation."); - } + throw new ConnectionStateException(ProtocolErrorCode.GENERIC_FAILURE, + "Connection processing should never be asked to validate an operation"); } - @Override - public ConnectionStateProcessor handshakeSucceeded() { + private ConnectionStateProcessor nextConnectionState() { if (securityService.isIntegratedSecurity()) { return new ConnectionShiroAuthenticatingStateProcessor(securityService); } else if (securityService.isPeerSecurityRequired() @@ -54,4 +55,19 @@ public class ProtobufConnectionHandshakeStateProcessor return new NoSecurityConnectionStateProcessor(); } } + + @Override + public boolean handleMessageIndependently(InputStream inputStream, OutputStream outputStream, + MessageExecutionContext executionContext) throws IOException { + // inputStream will have had the first byte stripped off to determine communication mode, add + // that byte back before processing message + PushbackInputStream messageStream = new PushbackInputStream(inputStream); + messageStream.unread(CommunicationMode.ProtobufClientServerProtocol.getModeNumber()); + + if (HandshakeHandler.handleHandshake(messageStream, outputStream, + executionContext.getStatistics())) { + executionContext.setConnectionStateProcessor(nextConnectionState()); + } + return true; + } } diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthenticationIntegrationTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthenticationIntegrationTest.java index c3b6c73..f3f68f4 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthenticationIntegrationTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthenticationIntegrationTest.java @@ -87,19 +87,10 @@ public class AuthenticationIntegrationTest { Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected); outputStream = socket.getOutputStream(); inputStream = socket.getInputStream(); - outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber()); - outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE); protobufProtocolSerializer = new ProtobufProtocolSerializer(); - ClientProtocol.Message.newBuilder() - .setRequest(ClientProtocol.Request.newBuilder() - .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder() - .setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE) - .setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE))) - .build().writeDelimitedTo(outputStream); - ClientProtocol.Message handshakeResponse = protobufProtocolSerializer.deserialize(inputStream); - assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed()); + MessageUtil.performAndVerifyHandshake(socket); } private static class SimpleSecurityManager implements SecurityManager { diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthorizationIntegrationTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthorizationIntegrationTest.java index c78a75d..cc69333 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthorizationIntegrationTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthorizationIntegrationTest.java @@ -107,23 +107,13 @@ public class AuthorizationIntegrationTest { Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected); outputStream = socket.getOutputStream(); inputStream = socket.getInputStream(); - outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber()); - outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE); serializationService = new ProtobufSerializationService(); protobufProtocolSerializer = new ProtobufProtocolSerializer(); when(mockSecurityManager.authorize(same(securityPrincipal), any())).thenReturn(false); - ClientProtocol.Message.newBuilder() - .setRequest(ClientProtocol.Request.newBuilder() - .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder() - .setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE) - .setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE))) - .build().writeDelimitedTo(outputStream); - ClientProtocol.Message handshakeResponse = - ClientProtocol.Message.parseDelimitedFrom(inputStream);; - assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed()); + MessageUtil.performAndVerifyHandshake(socket); ClientProtocol.Message authenticationRequest = ClientProtocol.Message.newBuilder() .setRequest(ClientProtocol.Request.newBuilder() diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/HandshakeIntegrationTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/HandshakeIntegrationTest.java index de3038f..0d3fd24 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/HandshakeIntegrationTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/HandshakeIntegrationTest.java @@ -18,6 +18,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -40,8 +41,10 @@ import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.server.CacheServer; import org.apache.geode.distributed.ConfigurationProperties; import org.apache.geode.internal.AvailablePortHelper; +import org.apache.geode.internal.cache.CacheServerImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.tier.CommunicationMode; -import org.apache.geode.internal.protocol.ProtocolErrorCode; +import org.apache.geode.internal.protocol.protobuf.Handshake; import org.apache.geode.internal.protocol.protobuf.v1.serializer.ProtobufProtocolSerializer; import org.apache.geode.test.junit.categories.IntegrationTest; @@ -96,48 +99,18 @@ public class HandshakeIntegrationTest { @Test public void testNormalHandshakeSucceeds() throws Exception { - outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber()); - outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE); - - ClientProtocol.Message.newBuilder() - .setRequest(ClientProtocol.Request.newBuilder() - .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder() - .setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE) - .setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE))) - .build().writeDelimitedTo(outputStream); - ClientProtocol.Message handshakeResponse = protobufProtocolSerializer.deserialize(inputStream); - assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed()); + MessageUtil.performAndVerifyHandshake(socket); } @Test public void testInvalidMajorVersionBreaksConnection() throws Exception { - outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber()); - outputStream.write(ConnectionAPI.MajorVersions.INVALID_MAJOR_VERSION_VALUE); + Handshake.NewConnectionHandshake.newBuilder().setMajorVersion(2000) + .setMinorVersion(Handshake.MinorVersions.CURRENT_MINOR_VERSION_VALUE).build() + .writeDelimitedTo(socket.getOutputStream()); - // Verify that connection is closed - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> { - try { - assertEquals(-1, socket.getInputStream().read()); // EOF implies disconnected. - return true; - } catch (IOException e) { - throw new RuntimeException(e); - } - }); - } - - @Test - public void testInvalidMinorVersionBreaksConnectionAfterResponse() throws Exception { - outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber()); - outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE); - - ClientProtocol.Message.newBuilder() - .setRequest(ClientProtocol.Request.newBuilder() - .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder() - .setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE) - .setMinorVersion(ConnectionAPI.MinorVersions.INVALID_MINOR_VERSION_VALUE))) - .build().writeDelimitedTo(outputStream); - ClientProtocol.Message handshakeResponse = protobufProtocolSerializer.deserialize(inputStream); - assertFalse(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed()); + Handshake.HandshakeAcknowledgement handshakeResponse = + Handshake.HandshakeAcknowledgement.parseDelimitedFrom(socket.getInputStream()); + assertFalse(handshakeResponse.getHandshakePassed()); // Verify that connection is closed Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> { @@ -150,29 +123,15 @@ public class HandshakeIntegrationTest { }); } + /** + * Protobuf seems to omit values that are set to their default (0). This ruins the serialization + * trick we use because the message size changes. + */ @Test - public void testUnexpectedHandshakeFailsAndClosesConnection() throws Exception { - outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber()); - outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE); - - ClientProtocol.Message.newBuilder() - .setRequest(ClientProtocol.Request.newBuilder() - .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder() - .setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE) - .setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE))) - .build().writeDelimitedTo(outputStream); - ClientProtocol.Message handshakeResponse = protobufProtocolSerializer.deserialize(inputStream); - assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed()); - - ClientProtocol.Message.newBuilder() - .setRequest(ClientProtocol.Request.newBuilder() - .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder() - .setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE) - .setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE))) - .build().writeDelimitedTo(outputStream); - ClientProtocol.Message failingHandshake = protobufProtocolSerializer.deserialize(inputStream); - assertEquals(ProtocolErrorCode.UNSUPPORTED_OPERATION.codeValue, - failingHandshake.getResponse().getErrorResponse().getError().getErrorCode()); + public void testMissingMajorVersionBreaksConnection() throws Exception { + Handshake.NewConnectionHandshake.newBuilder() + .setMajorVersion(Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE).setMinorVersion(0) + .build().writeDelimitedTo(socket.getOutputStream()); // Verify that connection is closed Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> { @@ -180,7 +139,8 @@ public class HandshakeIntegrationTest { assertEquals(-1, socket.getInputStream().read()); // EOF implies disconnected. return true; } catch (IOException e) { - throw new RuntimeException(e); + // Ignore IOExceptions (sometimes socket reset exception is thrown) + return true; } }); } diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/MessageUtil.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/MessageUtil.java index 5a10b1f..c998f4a 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/MessageUtil.java +++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/MessageUtil.java @@ -14,6 +14,16 @@ */ package org.apache.geode.internal.protocol.protobuf.v1; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.Socket; + +import com.google.protobuf.MessageLite; + +import org.apache.geode.internal.protocol.protobuf.Handshake; import org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufRequestUtilities; import org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufUtilities; import org.apache.geode.internal.protocol.serialization.SerializationService; @@ -22,6 +32,24 @@ import org.apache.geode.internal.protocol.serialization.registry.exception.Codec public class MessageUtil { + public static void performAndVerifyHandshake(Socket socket) throws IOException { + sendHandshake(socket); + verifyHandshakeSuccess(socket); + } + + public static void verifyHandshakeSuccess(Socket socket) throws IOException { + Handshake.HandshakeAcknowledgement handshakeResponse = + Handshake.HandshakeAcknowledgement.parseDelimitedFrom(socket.getInputStream()); + assertTrue(handshakeResponse.getHandshakePassed()); + } + + public static void sendHandshake(Socket socket) throws IOException { + Handshake.NewConnectionHandshake.newBuilder() + .setMajorVersion(Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE) + .setMinorVersion(Handshake.MinorVersions.CURRENT_MINOR_VERSION_VALUE).build() + .writeDelimitedTo(socket.getOutputStream()); + } + public static RegionAPI.GetRegionRequest makeGetRegionRequest(String requestRegion) { return RegionAPI.GetRegionRequest.newBuilder().setRegionName(requestRegion).build(); } @@ -68,4 +96,14 @@ public class MessageUtil { private static RegionAPI.GetRequest.Builder getGetRequestBuilder() { return RegionAPI.GetRequest.newBuilder(); } + + public static ByteArrayInputStream writeMessageDelimitedToInputStream(MessageLite message) { + try { + ByteArrayOutputStream output = new ByteArrayOutputStream(); + message.writeDelimitedTo(output); + return new ByteArrayInputStream(output.toByteArray()); + } catch (IOException e) { + throw new RuntimeException(e); // never happens. + } + } } diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheConnectionJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheConnectionJUnitTest.java index bf1127a..7f6327d 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheConnectionJUnitTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheConnectionJUnitTest.java @@ -143,8 +143,8 @@ public class CacheConnectionJUnitTest { } Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected); outputStream = socket.getOutputStream(); - outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber()); - outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE); + + MessageUtil.performAndVerifyHandshake(socket); serializationService = new ProtobufSerializationService(); } @@ -160,16 +160,6 @@ public class CacheConnectionJUnitTest { public void testBasicMessagesAndStats() throws Exception { ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer(); - ClientProtocol.Message.newBuilder() - .setRequest(ClientProtocol.Request.newBuilder() - .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder() - .setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE) - .setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE))) - .build().writeDelimitedTo(outputStream); - ClientProtocol.Message handshakeResponse = - ClientProtocol.Message.parseDelimitedFrom(socket.getInputStream()); - assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed()); - ClientProtocol.Message putMessage = MessageUtil.makePutRequestMessage(serializationService, TEST_KEY, TEST_VALUE, TEST_REGION); protobufProtocolSerializer.serialize(putMessage, outputStream); @@ -204,7 +194,7 @@ public class CacheConnectionJUnitTest { CacheServer cacheServer = cacheServers.stream().findFirst().get(); AcceptorImpl acceptor = ((CacheServerImpl) cacheServer).getAcceptor(); - Awaitility.await().atMost(30, TimeUnit.SECONDS) + Awaitility.await().atMost(5, TimeUnit.SECONDS) .until(() -> acceptor.getClientServerCnxCount() == 1); // make a request to the server @@ -216,7 +206,7 @@ public class CacheConnectionJUnitTest { // make sure socket is still open assertFalse(socket.isClosed()); socket.close(); - Awaitility.await().atMost(30, TimeUnit.SECONDS) + Awaitility.await().atMost(5, TimeUnit.SECONDS) .until(() -> acceptor.getClientServerCnxCount() == 0); } diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheConnectionTimeoutJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheConnectionTimeoutJUnitTest.java index b14ceb2..4adf37e 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheConnectionTimeoutJUnitTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheConnectionTimeoutJUnitTest.java @@ -106,8 +106,7 @@ public class CacheConnectionTimeoutJUnitTest { Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected); outputStream = socket.getOutputStream(); - outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber()); - outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE); + MessageUtil.performAndVerifyHandshake(socket); serializationService = new ProtobufSerializationService(); @@ -155,14 +154,6 @@ public class CacheConnectionTimeoutJUnitTest { public void testResponsiveClientsStaysConnected() throws Exception { ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer(); - ClientProtocol.Message.newBuilder() - .setRequest(ClientProtocol.Request.newBuilder() - .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder() - .setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE) - .setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE))) - .build().writeDelimitedTo(outputStream); - protobufProtocolSerializer.deserialize(socket.getInputStream()); - ClientProtocol.Message putMessage = MessageUtil.makePutRequestMessage(serializationService, TEST_KEY, TEST_VALUE, TEST_REGION); diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheMaxConnectionJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheMaxConnectionJUnitTest.java index 43287e0..f9489bf 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheMaxConnectionJUnitTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheMaxConnectionJUnitTest.java @@ -15,6 +15,7 @@ package org.apache.geode.internal.protocol.protobuf.v1.acceptance; +import static org.apache.geode.internal.protocol.protobuf.v1.MessageUtil.performAndVerifyHandshake; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -53,7 +54,6 @@ import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl; import org.apache.geode.internal.net.SocketCreatorFactory; import org.apache.geode.internal.protocol.exception.InvalidProtocolMessageException; import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol; -import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI; import org.apache.geode.internal.protocol.protobuf.v1.MessageUtil; import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService; import org.apache.geode.internal.protocol.protobuf.v1.serializer.ProtobufProtocolSerializer; @@ -102,9 +102,6 @@ public class CacheMaxConnectionJUnitTest { socket = new Socket("localhost", cacheServerPort); Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected); - OutputStream outputStream = socket.getOutputStream(); - outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber()); - outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE); serializationService = new ProtobufSerializationService(); protobufProtocolSerializer = new ProtobufProtocolSerializer(); @@ -189,18 +186,8 @@ public class CacheMaxConnectionJUnitTest { Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected); OutputStream outputStream = socket.getOutputStream(); - outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber()); - outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE); - - ClientProtocol.Message.newBuilder() - .setRequest(ClientProtocol.Request.newBuilder() - .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder() - .setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE) - .setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE))) - .build().writeDelimitedTo(outputStream); - ClientProtocol.Message handshakeResponse = - ClientProtocol.Message.parseDelimitedFrom(socket.getInputStream()); - assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed()); + + performAndVerifyHandshake(socket); ClientProtocol.Message putMessage = MessageUtil .makePutRequestMessage(serializationService, TEST_KEY, TEST_VALUE, TEST_REGION); diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java index b114a21..a828fa5 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java @@ -138,18 +138,8 @@ public class CacheOperationsJUnitTest { } Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected); outputStream = socket.getOutputStream(); - outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber()); - outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE); - - ClientProtocol.Message.newBuilder() - .setRequest(ClientProtocol.Request.newBuilder() - .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder() - .setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE) - .setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE))) - .build().writeDelimitedTo(outputStream); - ClientProtocol.Message handshakeResponse = - ClientProtocol.Message.parseDelimitedFrom(socket.getInputStream()); - assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed()); + + MessageUtil.performAndVerifyHandshake(socket); serializationService = new ProtobufSerializationService(); } diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/LocatorConnectionDUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/LocatorConnectionDUnitTest.java index ac432af..fc52d33 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/LocatorConnectionDUnitTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/LocatorConnectionDUnitTest.java @@ -41,6 +41,7 @@ import org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStat import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol; import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI; import org.apache.geode.internal.protocol.protobuf.v1.LocatorAPI; +import org.apache.geode.internal.protocol.protobuf.v1.MessageUtil; import org.apache.geode.internal.protocol.protobuf.v1.serializer.ProtobufProtocolSerializer; import org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufRequestUtilities; import org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufUtilities; @@ -72,11 +73,7 @@ public class LocatorConnectionDUnitTest extends JUnit4CacheTestCase { Host host = Host.getHost(0); int locatorPort = DistributedTestUtils.getDUnitLocatorPort(); Socket socket = new Socket(host.getHostName(), locatorPort); - DataOutputStream dataOutputStream = new DataOutputStream(socket.getOutputStream()); - dataOutputStream.writeInt(0); - // Using the constant from AcceptorImpl to ensure that magic byte is the same - dataOutputStream.writeByte(ProtobufClientServerProtocol.getModeNumber()); - dataOutputStream.writeByte(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE); + MessageUtil.sendHandshake(socket); return socket; } diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeHandlerJUnitTest.java new file mode 100644 index 0000000..098b1a2 --- /dev/null +++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeHandlerJUnitTest.java @@ -0,0 +1,140 @@ +package org.apache.geode.internal.protocol.protobuf.v1.operations; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import org.apache.shiro.subject.Subject; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.exception.InvalidExecutionContextException; +import org.apache.geode.internal.protocol.MessageExecutionContext; +import org.apache.geode.internal.protocol.ProtocolErrorCode; +import org.apache.geode.internal.protocol.Result; +import org.apache.geode.internal.protocol.protobuf.Handshake; +import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol; +import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI; +import org.apache.geode.internal.protocol.protobuf.v1.MessageUtil; +import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService; +import org.apache.geode.internal.protocol.protobuf.v1.state.ConnectionShiroAuthenticatingStateProcessor; +import org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionHandshakeStateProcessor; +import org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufUtilities; +import org.apache.geode.internal.protocol.serialization.SerializationService; +import org.apache.geode.internal.protocol.state.ConnectionShiroAuthorizingStateProcessor; +import org.apache.geode.internal.protocol.state.NoSecurityConnectionStateProcessor; +import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics; +import org.apache.geode.internal.security.SecurityService; +import org.apache.geode.test.junit.categories.UnitTest; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +@Category(UnitTest.class) +public class HandshakeHandlerJUnitTest { + private static final int INVALID_MAJOR_VERSION = 67; + private static final int INVALID_MINOR_VERSION = 92347; + + private HandshakeHandler handshakeHandler = new HandshakeHandler(); + + @Test + public void testCurrentVersionHandshakeSucceeds() throws Exception { + Handshake.NewConnectionHandshake handshakeRequest = + generateHandshakeRequest(Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE, + Handshake.MinorVersions.CURRENT_MINOR_VERSION_VALUE); + + ByteArrayInputStream inputStream = + MessageUtil.writeMessageDelimitedToInputStream(handshakeRequest); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + assertTrue(handshakeHandler.handleHandshake(inputStream, outputStream, + mock(ProtocolClientStatistics.class))); + + Handshake.HandshakeAcknowledgement handshakeResponse = Handshake.HandshakeAcknowledgement + .parseDelimitedFrom(new ByteArrayInputStream(outputStream.toByteArray())); + assertTrue(handshakeResponse.getHandshakePassed()); + assertEquals(Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE, + handshakeResponse.getServerMajorVersion()); + assertEquals(Handshake.MinorVersions.CURRENT_MINOR_VERSION_VALUE, + handshakeResponse.getServerMinorVersion()); + } + + @Test + public void testInvalidMajorVersionFails() throws Exception { + assertNotEquals(INVALID_MAJOR_VERSION, Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE); + + Handshake.NewConnectionHandshake handshakeRequest = generateHandshakeRequest( + INVALID_MAJOR_VERSION, Handshake.MinorVersions.CURRENT_MINOR_VERSION_VALUE); + + verifyHandshakeFails(handshakeRequest); + + // Also validate the protobuf INVALID_MAJOR_VERSION_VALUE constant fails + handshakeRequest = generateHandshakeRequest(Handshake.MajorVersions.INVALID_MAJOR_VERSION_VALUE, + Handshake.MinorVersions.CURRENT_MINOR_VERSION_VALUE); + verifyHandshakeFails(handshakeRequest); + } + + private void verifyHandshakeFails(Handshake.NewConnectionHandshake handshakeRequest) + throws Exception { + ByteArrayInputStream inputStream = + MessageUtil.writeMessageDelimitedToInputStream(handshakeRequest); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + try { + handshakeHandler.handleHandshake(inputStream, outputStream, + mock(ProtocolClientStatistics.class)); + fail("Invalid handshake should throw IOException"); + } catch (IOException e) { + // expected if handshake verification fails + } + + Handshake.HandshakeAcknowledgement handshakeResponse = Handshake.HandshakeAcknowledgement + .parseDelimitedFrom(new ByteArrayInputStream(outputStream.toByteArray())); + + assertFalse(handshakeResponse.getHandshakePassed()); + } + + @Test + public void testInvalidMinorVersionFails() throws Exception { + assertNotEquals(INVALID_MINOR_VERSION, Handshake.MinorVersions.CURRENT_MINOR_VERSION_VALUE); + + Handshake.NewConnectionHandshake handshakeRequest = generateHandshakeRequest( + Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE, INVALID_MINOR_VERSION); + + verifyHandshakeFails(handshakeRequest); + + // Also validate the protobuf INVALID_MINOR_VERSION_VALUE constant fails + handshakeRequest = generateHandshakeRequest(Handshake.MajorVersions.CURRENT_MAJOR_VERSION_VALUE, + Handshake.MinorVersions.INVALID_MINOR_VERSION_VALUE); + verifyHandshakeFails(handshakeRequest); + } + + private Handshake.NewConnectionHandshake generateHandshakeRequest(int majorVersion, + int minorVersion) { + return Handshake.NewConnectionHandshake.newBuilder().setMajorVersion(majorVersion) + .setMinorVersion(minorVersion).build(); + } +} diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeRequestOperationHandlerJUnitTest.java deleted file mode 100644 index 0baf9bb..0000000 --- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/HandshakeRequestOperationHandlerJUnitTest.java +++ /dev/null @@ -1,191 +0,0 @@ -package org.apache.geode.internal.protocol.protobuf.v1.operations; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; - -import org.apache.shiro.subject.Subject; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import org.apache.geode.internal.cache.InternalCache; -import org.apache.geode.internal.exception.InvalidExecutionContextException; -import org.apache.geode.internal.protocol.Failure; -import org.apache.geode.internal.protocol.MessageExecutionContext; -import org.apache.geode.internal.protocol.ProtocolErrorCode; -import org.apache.geode.internal.protocol.Result; -import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol; -import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI; -import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService; -import org.apache.geode.internal.protocol.protobuf.v1.state.ConnectionShiroAuthenticatingStateProcessor; -import org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionHandshakeStateProcessor; -import org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufResponseUtilities; -import org.apache.geode.internal.protocol.serialization.SerializationService; -import org.apache.geode.internal.protocol.state.ConnectionShiroAuthorizingStateProcessor; -import org.apache.geode.internal.protocol.state.NoSecurityConnectionStateProcessor; -import org.apache.geode.internal.protocol.state.exception.ConnectionStateException; -import org.apache.geode.internal.security.SecurityService; -import org.apache.geode.test.junit.categories.UnitTest; - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -@Category(UnitTest.class) -public class HandshakeRequestOperationHandlerJUnitTest { - private static final int INVALID_MAJOR_VERSION = 67; - private static final int INVALID_MINOR_VERSION = 92347; - - private HandshakeRequestOperationHandler handshakeHandler = - new HandshakeRequestOperationHandler(); - private SerializationService serializationService = new ProtobufSerializationService(); - private ProtobufConnectionHandshakeStateProcessor handshakeStateProcessor; - - @Before - public void Setup() { - handshakeStateProcessor = - new ProtobufConnectionHandshakeStateProcessor(mock(SecurityService.class)); - } - - @Test - public void testCurrentVersionHandshakeSucceeds() throws Exception { - ConnectionAPI.HandshakeRequest handshakeRequest = - generateHandshakeRequest(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE, - ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE); - MessageExecutionContext messageExecutionContext = - new MessageExecutionContext(mock(InternalCache.class), null, handshakeStateProcessor); - Result result = - handshakeHandler.process(serializationService, handshakeRequest, messageExecutionContext); - ConnectionAPI.HandshakeResponse handshakeResponse = result.getMessage(); - assertTrue(handshakeResponse.getHandshakePassed()); - assertEquals(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE, - handshakeResponse.getServerMajorVersion()); - assertEquals(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE, - handshakeResponse.getServerMinorVersion()); - } - - @Test - public void testInvalidMajorVersionFails() throws Exception { - assertNotEquals(INVALID_MAJOR_VERSION, ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE); - - ConnectionAPI.HandshakeRequest handshakeRequest = generateHandshakeRequest( - INVALID_MAJOR_VERSION, ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE); - MessageExecutionContext messageExecutionContext = - new MessageExecutionContext(mock(InternalCache.class), null, handshakeStateProcessor); - - verifyHandshakeFails(handshakeRequest, messageExecutionContext); - } - - @Test - public void testInvalidMajorVersionProtocolConstantFails() throws Exception { - ConnectionAPI.HandshakeRequest handshakeRequest = - generateHandshakeRequest(ConnectionAPI.MajorVersions.INVALID_MAJOR_VERSION_VALUE, - ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE); - MessageExecutionContext messageExecutionContext = - new MessageExecutionContext(mock(InternalCache.class), null, handshakeStateProcessor); - verifyHandshakeFails(handshakeRequest, messageExecutionContext); - } - - private void verifyHandshakeFails(ConnectionAPI.HandshakeRequest handshakeRequest, - MessageExecutionContext messageExecutionContext) throws Exception { - Result result = - handshakeHandler.process(serializationService, handshakeRequest, messageExecutionContext); - ConnectionAPI.HandshakeResponse handshakeResponse = result.getMessage(); - assertFalse(handshakeResponse.getHandshakePassed()); - } - - @Test - public void testInvalidMinorVersionFails() throws Exception { - assertNotEquals(INVALID_MINOR_VERSION, ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE); - - ConnectionAPI.HandshakeRequest handshakeRequest = generateHandshakeRequest( - ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE, INVALID_MINOR_VERSION); - MessageExecutionContext messageExecutionContext = - new MessageExecutionContext(mock(InternalCache.class), null, handshakeStateProcessor); - - verifyHandshakeFails(handshakeRequest, messageExecutionContext); - } - - @Test - public void testInvalidMinorVersionProtocolConstantFails() throws Exception { - ConnectionAPI.HandshakeRequest handshakeRequest = - generateHandshakeRequest(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE, - ConnectionAPI.MinorVersions.INVALID_MINOR_VERSION_VALUE); - MessageExecutionContext messageExecutionContext = - new MessageExecutionContext(mock(InternalCache.class), null, handshakeStateProcessor); - - verifyHandshakeFails(handshakeRequest, messageExecutionContext); - } - - @Test - public void testNoSecurityStateFailsHandshake() throws Exception { - ConnectionAPI.HandshakeRequest handshakeRequest = - generateHandshakeRequest(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE, - ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE); - MessageExecutionContext messageExecutionContext = new MessageExecutionContext( - mock(InternalCache.class), null, new NoSecurityConnectionStateProcessor()); - - try { - handshakeHandler.process(serializationService, handshakeRequest, messageExecutionContext); - fail("Handshake in non-handshake state should throw exception"); - } catch (ConnectionStateException ex) { - assertEquals(ProtocolErrorCode.UNSUPPORTED_OPERATION, ex.getErrorCode()); - } - } - - @Test - public void testAuthenticatingStateFailsHandshake() throws Exception { - ConnectionAPI.HandshakeRequest handshakeRequest = - generateHandshakeRequest(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE, - ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE); - MessageExecutionContext messageExecutionContext = - new MessageExecutionContext(mock(InternalCache.class), null, - new ConnectionShiroAuthenticatingStateProcessor(mock(SecurityService.class))); - - try { - handshakeHandler.process(serializationService, handshakeRequest, messageExecutionContext); - fail("Handshake in non-handshake state should throw exception"); - } catch (ConnectionStateException ex) { - assertEquals(ProtocolErrorCode.UNSUPPORTED_OPERATION, ex.getErrorCode()); - } - } - - @Test - public void testAuthorizingStateFailsHandshake() throws Exception { - ConnectionAPI.HandshakeRequest handshakeRequest = - generateHandshakeRequest(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE, - ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE); - MessageExecutionContext messageExecutionContext = - new MessageExecutionContext(mock(InternalCache.class), null, - new ConnectionShiroAuthorizingStateProcessor(mock(SecurityService.class), - mock(Subject.class))); - - try { - handshakeHandler.process(serializationService, handshakeRequest, messageExecutionContext); - fail("Handshake in non-handshake state should throw exception"); - } catch (ConnectionStateException ex) { - assertEquals(ProtocolErrorCode.UNSUPPORTED_OPERATION, ex.getErrorCode()); - } - } - - private ConnectionAPI.HandshakeRequest generateHandshakeRequest(int majorVersion, - int minorVersion) { - return ConnectionAPI.HandshakeRequest.newBuilder().setMajorVersion(majorVersion) - .setMinorVersion(minorVersion).build(); - } -} diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/VersionValidatorJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/VersionValidatorJUnitTest.java index b59d154..e83e6e0 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/VersionValidatorJUnitTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/VersionValidatorJUnitTest.java @@ -20,7 +20,7 @@ import static org.junit.Assert.assertTrue; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI; +import org.apache.geode.internal.protocol.protobuf.Handshake; import org.apache.geode.test.junit.categories.UnitTest; @Category(UnitTest.class) @@ -33,11 +33,11 @@ public class VersionValidatorJUnitTest { @Test public void testInvalidVersions() throws Exception { assertFalse( - validator.isValid(MAJOR_VERSION, ConnectionAPI.MinorVersions.INVALID_MINOR_VERSION_VALUE)); + validator.isValid(MAJOR_VERSION, Handshake.MinorVersions.INVALID_MINOR_VERSION_VALUE)); assertFalse( - validator.isValid(ConnectionAPI.MajorVersions.INVALID_MAJOR_VERSION_VALUE, MINOR_VERSION)); - assertFalse(validator.isValid(ConnectionAPI.MajorVersions.INVALID_MAJOR_VERSION_VALUE, - ConnectionAPI.MinorVersions.INVALID_MINOR_VERSION_VALUE)); + validator.isValid(Handshake.MajorVersions.INVALID_MAJOR_VERSION_VALUE, MINOR_VERSION)); + assertFalse(validator.isValid(Handshake.MajorVersions.INVALID_MAJOR_VERSION_VALUE, + Handshake.MinorVersions.INVALID_MINOR_VERSION_VALUE)); } @Test -- To stop receiving notification emails like this one, please contact ['"commits@geode.apache.org" '].