geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From upthewatersp...@apache.org
Subject [geode] branch develop updated: GEODE-4132: Sending handshake response from locator for new protocol
Date Wed, 27 Dec 2017 00:23:00 GMT
This is an automated email from the ASF dual-hosted git repository.

upthewaterspout pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8a1ec2d  GEODE-4132: Sending handshake response from locator for new protocol
8a1ec2d is described below

commit 8a1ec2d85332b5341605271b4ceec66efc738b03
Author: Dan Smith <dsmith@pivotal.io>
AuthorDate: Tue Dec 26 16:22:56 2017 -0800

    GEODE-4132: Sending handshake response from locator for new protocol
    
    Changing the locator to send a response to the NewConnectionVersion
    message to be consistent with the server. This change also allows the
    locator to handle an authentication message, which means that
    GetAvailableServers will work if a security manager is enabled and the
    client sends a validate authentication message.
    
    Signed-off-by: Sarge <mdodge@pivotal.io>
---
 ...xt.java => LocatorMessageExecutionContext.java} |  50 ++-----
 .../internal/protocol/MessageExecutionContext.java |  55 +------
 ...ext.java => ServerMessageExecutionContext.java} |  45 +-----
 .../distributed/internal/InternalLocator.java      |   9 +-
 .../distributed/internal/tcpserver/TcpServer.java  |   8 +-
 .../client/protocol/ClientProtocolService.java     |   3 +-
 .../geode/experimental/driver/ProtobufDriver.java  |  11 +-
 .../protobuf/v1/ProtobufCachePipeline.java         |  14 +-
 .../protobuf/v1/ProtobufLocatorPipeline.java       |  87 -----------
 .../protobuf/v1/ProtobufProtocolService.java       |  17 ++-
 .../GetAvailableServersOperationHandler.java       |   2 +
 .../internal/protocol/TestExecutionContext.java    |   4 +-
 .../LocatorConnectionAuthenticationDUnitTest.java  | 164 +++++++++++++++++++++
 .../v1/acceptance/LocatorConnectionDUnitTest.java  |   1 +
 14 files changed, 228 insertions(+), 242 deletions(-)

diff --git a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/MessageExecutionContext.java
b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/LocatorMessageExecutionContext.java
similarity index 55%
copy from geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/MessageExecutionContext.java
copy to geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/LocatorMessageExecutionContext.java
index ec98b0e..9c77561 100644
--- a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/MessageExecutionContext.java
+++ b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/LocatorMessageExecutionContext.java
@@ -19,36 +19,19 @@ package org.apache.geode.internal.protocol;
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.distributed.Locator;
-import org.apache.geode.distributed.internal.InternalLocator;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
 import org.apache.geode.internal.protocol.state.ConnectionStateProcessor;
+import org.apache.geode.internal.protocol.state.ConnectionTerminatingStateProcessor;
 import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics;
 
 @Experimental
-public class MessageExecutionContext {
-  private final Cache cache;
+public class LocatorMessageExecutionContext extends MessageExecutionContext {
   private final Locator locator;
-  private final ProtocolClientStatistics statistics;
-  private ConnectionStateProcessor connectionStateProcessor;
 
-  public MessageExecutionContext(Cache cache, ProtocolClientStatistics statistics,
-      ConnectionStateProcessor initialConnectionStateProcessor) {
-    this.cache = cache;
-    this.locator = null;
-    this.statistics = statistics;
-    this.connectionStateProcessor = initialConnectionStateProcessor;
-  }
-
-  public MessageExecutionContext(InternalLocator locator, ProtocolClientStatistics statistics,
+  public LocatorMessageExecutionContext(Locator locator, ProtocolClientStatistics statistics,
       ConnectionStateProcessor initialConnectionStateProcessor) {
+    super(statistics, initialConnectionStateProcessor);
     this.locator = locator;
-    this.cache = null;
-    this.statistics = statistics;
-    connectionStateProcessor = initialConnectionStateProcessor;
-  }
-
-  public ConnectionStateProcessor getConnectionStateProcessor() {
-    return connectionStateProcessor;
   }
 
   /**
@@ -57,12 +40,11 @@ public class MessageExecutionContext {
    *
    * @throws InvalidExecutionContextException if there is no cache available
    */
+  @Override
   public Cache getCache() throws InvalidExecutionContextException {
-    if (cache != null) {
-      return cache;
-    }
+    setConnectionStateProcessor(new ConnectionTerminatingStateProcessor());
     throw new InvalidExecutionContextException(
-        "Operations on the locator should not to try to operate on a cache");
+        "Operations on the locator should not to try to operate on a server");
   }
 
   /**
@@ -71,23 +53,9 @@ public class MessageExecutionContext {
    *
    * @throws InvalidExecutionContextException if there is no locator available
    */
+  @Override
   public Locator getLocator() throws InvalidExecutionContextException {
-    if (locator != null) {
-      return locator;
-    }
-    throw new InvalidExecutionContextException(
-        "Operations on the server should not to try to operate on a locator");
+    return locator;
   }
 
-  /**
-   * Returns the statistics for recording operation stats. In a unit test environment this
may not
-   * be a protocol-specific statistics implementation.
-   */
-  public ProtocolClientStatistics getStatistics() {
-    return statistics;
-  }
-
-  public void setConnectionStateProcessor(ConnectionStateProcessor connectionStateProcessor)
{
-    this.connectionStateProcessor = connectionStateProcessor;
-  }
 }
diff --git a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/MessageExecutionContext.java
b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/MessageExecutionContext.java
index ec98b0e..a3d0ca7 100644
--- a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/MessageExecutionContext.java
+++ b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/MessageExecutionContext.java
@@ -12,72 +12,33 @@
  * or implied. See the License for the specific language governing permissions and limitations
under
  * the License.
  */
-
 package org.apache.geode.internal.protocol;
 
-
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.distributed.Locator;
-import org.apache.geode.distributed.internal.InternalLocator;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
 import org.apache.geode.internal.protocol.state.ConnectionStateProcessor;
 import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics;
 
 @Experimental
-public class MessageExecutionContext {
-  private final Cache cache;
-  private final Locator locator;
-  private final ProtocolClientStatistics statistics;
-  private ConnectionStateProcessor connectionStateProcessor;
-
-  public MessageExecutionContext(Cache cache, ProtocolClientStatistics statistics,
-      ConnectionStateProcessor initialConnectionStateProcessor) {
-    this.cache = cache;
-    this.locator = null;
-    this.statistics = statistics;
-    this.connectionStateProcessor = initialConnectionStateProcessor;
-  }
+public abstract class MessageExecutionContext {
+  protected final ProtocolClientStatistics statistics;
+  protected ConnectionStateProcessor connectionStateProcessor;
 
-  public MessageExecutionContext(InternalLocator locator, ProtocolClientStatistics statistics,
-      ConnectionStateProcessor initialConnectionStateProcessor) {
-    this.locator = locator;
-    this.cache = null;
+  public MessageExecutionContext(ProtocolClientStatistics statistics,
+      ConnectionStateProcessor connectionStateProcessor) {
     this.statistics = statistics;
-    connectionStateProcessor = initialConnectionStateProcessor;
+    this.connectionStateProcessor = connectionStateProcessor;
   }
 
   public ConnectionStateProcessor getConnectionStateProcessor() {
     return connectionStateProcessor;
   }
 
-  /**
-   * Returns the cache associated with this execution
-   * <p>
-   *
-   * @throws InvalidExecutionContextException if there is no cache available
-   */
-  public Cache getCache() throws InvalidExecutionContextException {
-    if (cache != null) {
-      return cache;
-    }
-    throw new InvalidExecutionContextException(
-        "Operations on the locator should not to try to operate on a cache");
-  }
+  public abstract Cache getCache() throws InvalidExecutionContextException;
 
-  /**
-   * Returns the locator associated with this execution
-   * <p>
-   *
-   * @throws InvalidExecutionContextException if there is no locator available
-   */
-  public Locator getLocator() throws InvalidExecutionContextException {
-    if (locator != null) {
-      return locator;
-    }
-    throw new InvalidExecutionContextException(
-        "Operations on the server should not to try to operate on a locator");
-  }
+  public abstract Locator getLocator() throws InvalidExecutionContextException;
 
   /**
    * Returns the statistics for recording operation stats. In a unit test environment this
may not
diff --git a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/MessageExecutionContext.java
b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/ServerMessageExecutionContext.java
similarity index 57%
copy from geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/MessageExecutionContext.java
copy to geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/ServerMessageExecutionContext.java
index ec98b0e..c2c8606 100644
--- a/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/MessageExecutionContext.java
+++ b/geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/ServerMessageExecutionContext.java
@@ -25,30 +25,13 @@ import org.apache.geode.internal.protocol.state.ConnectionStateProcessor;
 import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics;
 
 @Experimental
-public class MessageExecutionContext {
+public class ServerMessageExecutionContext extends MessageExecutionContext {
   private final Cache cache;
-  private final Locator locator;
-  private final ProtocolClientStatistics statistics;
-  private ConnectionStateProcessor connectionStateProcessor;
 
-  public MessageExecutionContext(Cache cache, ProtocolClientStatistics statistics,
+  public ServerMessageExecutionContext(Cache cache, ProtocolClientStatistics statistics,
       ConnectionStateProcessor initialConnectionStateProcessor) {
+    super(statistics, initialConnectionStateProcessor);
     this.cache = cache;
-    this.locator = null;
-    this.statistics = statistics;
-    this.connectionStateProcessor = initialConnectionStateProcessor;
-  }
-
-  public MessageExecutionContext(InternalLocator locator, ProtocolClientStatistics statistics,
-      ConnectionStateProcessor initialConnectionStateProcessor) {
-    this.locator = locator;
-    this.cache = null;
-    this.statistics = statistics;
-    connectionStateProcessor = initialConnectionStateProcessor;
-  }
-
-  public ConnectionStateProcessor getConnectionStateProcessor() {
-    return connectionStateProcessor;
   }
 
   /**
@@ -57,12 +40,9 @@ public class MessageExecutionContext {
    *
    * @throws InvalidExecutionContextException if there is no cache available
    */
+  @Override
   public Cache getCache() throws InvalidExecutionContextException {
-    if (cache != null) {
-      return cache;
-    }
-    throw new InvalidExecutionContextException(
-        "Operations on the locator should not to try to operate on a cache");
+    return cache;
   }
 
   /**
@@ -71,23 +51,10 @@ public class MessageExecutionContext {
    *
    * @throws InvalidExecutionContextException if there is no locator available
    */
+  @Override
   public Locator getLocator() throws InvalidExecutionContextException {
-    if (locator != null) {
-      return locator;
-    }
     throw new InvalidExecutionContextException(
         "Operations on the server should not to try to operate on a locator");
   }
 
-  /**
-   * Returns the statistics for recording operation stats. In a unit test environment this
may not
-   * be a protocol-specific statistics implementation.
-   */
-  public ProtocolClientStatistics getStatistics() {
-    return statistics;
-  }
-
-  public void setConnectionStateProcessor(ConnectionStateProcessor connectionStateProcessor)
{
-    this.connectionStateProcessor = connectionStateProcessor;
-  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
index 2f5e599..4d054cc 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
@@ -523,6 +523,9 @@ public class InternalLocator extends Locator implements ConnectListener
{
   }
 
   public InternalCache getCache() {
+    if (myCache == null) {
+      return GemFireCacheImpl.getInstance();
+    }
     return myCache;
   }
 
@@ -1082,7 +1085,11 @@ public class InternalLocator extends Locator implements ConnectListener
{
 
   @Override
   public DistributedSystem getDistributedSystem() {
-    return this.myDs;
+    if (myDs == null) {
+      return InternalDistributedSystem.getAnyInstance();
+    }
+
+    return myDs;
   }
 
   @Override
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
index 4e002d8..b72383e 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
@@ -534,9 +534,11 @@ public class TcpServer {
       ClientProtocolService clientProtocolService = clientProtocolServiceLoader.lookupService();
       clientProtocolService.initializeStatistics("LocatorStats",
           internalLocator.getDistributedSystem());
-      try (ClientProtocolProcessor pipeline =
-          clientProtocolService.createProcessorForLocator(internalLocator)) {
-        pipeline.processMessage(input, socket.getOutputStream());
+      try (ClientProtocolProcessor pipeline = clientProtocolService.createProcessorForLocator(
+          internalLocator, internalLocator.getCache().getSecurityService())) {
+        while (!pipeline.socketProcessingIsFinished()) {
+          pipeline.processMessage(input, socket.getOutputStream());
+        }
       } catch (IncompatibleVersionException e) {
         // should not happen on the locator as there is no handshake.
         log.error("Unexpected exception in client message processing", e);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolService.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolService.java
index b86d76a..d31a4d5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolService.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolService.java
@@ -37,7 +37,8 @@ public interface ClientProtocolService {
   /**
    * Create a locator processor. The locator does not currently provide any authentication.
    */
-  ClientProtocolProcessor createProcessorForLocator(InternalLocator locator);
+  ClientProtocolProcessor createProcessorForLocator(InternalLocator locator,
+      SecurityService securityService);
 
   int getServiceProtocolVersion();
 }
diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java
b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java
index 1dac79f..898b7a2 100644
--- a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java
+++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java
@@ -133,24 +133,23 @@ public class ProtobufDriver implements Driver {
         final Socket locatorSocket = new Socket(locator.getAddress(), locator.getPort());
 
         final OutputStream outputStream = locatorSocket.getOutputStream();
+        final InputStream inputStream = locatorSocket.getInputStream();
         ProtocolVersion.NewConnectionClientVersion.newBuilder()
             .setMajorVersion(ProtocolVersion.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
             .setMinorVersion(ProtocolVersion.MinorVersions.CURRENT_MINOR_VERSION_VALUE).build()
             .writeDelimitedTo(outputStream);
 
         // The locator does not currently send a reply to the ProtocolVersion...
-        // if
-        // (!ProtocolVersion.HandshakeAcknowledgement.parseDelimitedFrom(inputStream).getHandshakePassed())
-        // {
-        // throw new IOException("Failed ProtocolVersion.");
-        // }
+        if (!ProtocolVersion.VersionAcknowledgement.parseDelimitedFrom(inputStream)
+            .getVersionAccepted()) {
+          throw new IOException("Failed ProtocolVersion.");
+        }
 
         ClientProtocol.Message.newBuilder()
             .setRequest(ClientProtocol.Request.newBuilder()
                 .setGetAvailableServersRequest(LocatorAPI.GetAvailableServersRequest.newBuilder()))
             .build().writeDelimitedTo(outputStream);
 
-        final InputStream inputStream = locatorSocket.getInputStream();
         LocatorAPI.GetAvailableServersResponse getAvailableServersResponse = ClientProtocol.Message
             .parseDelimitedFrom(inputStream).getResponse().getGetAvailableServersResponse();
         if (getAvailableServersResponse.getServersCount() < 1) {
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufCachePipeline.java
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufCachePipeline.java
index 4b88ec4..c76db63 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufCachePipeline.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufCachePipeline.java
@@ -20,31 +20,23 @@ import java.io.InputStream;
 import java.io.OutputStream;
 
 import org.apache.geode.annotations.Experimental;
-import org.apache.geode.cache.Cache;
 import org.apache.geode.internal.cache.client.protocol.ClientProtocolProcessor;
 import org.apache.geode.internal.protocol.MessageExecutionContext;
-import org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionHandshakeStateProcessor;
-import org.apache.geode.internal.protocol.state.ConnectionStateProcessor;
 import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics;
-import org.apache.geode.internal.security.SecurityService;
 
 
 @Experimental
 public final class ProtobufCachePipeline implements ClientProtocolProcessor {
   private final ProtocolClientStatistics statistics;
   private final ProtobufStreamProcessor streamProcessor;
-  private final ConnectionStateProcessor initialCacheConnectionStateProcessor;
   private final MessageExecutionContext messageExecutionContext;
 
   ProtobufCachePipeline(ProtobufStreamProcessor protobufStreamProcessor,
-      ProtocolClientStatistics statistics, Cache cache, SecurityService securityService)
{
+      MessageExecutionContext context) {
     this.streamProcessor = protobufStreamProcessor;
-    this.statistics = statistics;
+    this.statistics = context.getStatistics();
     this.statistics.clientConnected();
-    this.initialCacheConnectionStateProcessor =
-        new ProtobufConnectionHandshakeStateProcessor(securityService);
-    this.messageExecutionContext =
-        new MessageExecutionContext(cache, statistics, initialCacheConnectionStateProcessor);
+    this.messageExecutionContext = context;
   }
 
   @Override
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufLocatorPipeline.java
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufLocatorPipeline.java
deleted file mode 100644
index 58e8816..0000000
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufLocatorPipeline.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information
regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain
a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under
the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
- * or implied. See the License for the specific language governing permissions and limitations
under
- * the License.
- */
-
-package org.apache.geode.internal.protocol.protobuf.v1;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.PushbackInputStream;
-
-import org.apache.geode.annotations.Experimental;
-import org.apache.geode.cache.IncompatibleVersionException;
-import org.apache.geode.distributed.internal.InternalLocator;
-import org.apache.geode.internal.cache.client.protocol.ClientProtocolProcessor;
-import org.apache.geode.internal.cache.tier.CommunicationMode;
-import org.apache.geode.internal.protocol.MessageExecutionContext;
-import org.apache.geode.internal.protocol.protobuf.ProtocolVersion;
-import org.apache.geode.internal.protocol.protobuf.v1.operations.VersionValidator;
-import org.apache.geode.internal.protocol.state.ConnectionStateProcessor;
-import org.apache.geode.internal.protocol.state.NoSecurityConnectionStateProcessor;
-import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics;
-
-@Experimental
-public final class ProtobufLocatorPipeline implements ClientProtocolProcessor {
-  private final ProtocolClientStatistics statistics;
-  private final InternalLocator locator;
-  private final ProtobufStreamProcessor streamProcessor;
-  private final ConnectionStateProcessor locatorConnectionState;
-  private final VersionValidator validator;
-
-  ProtobufLocatorPipeline(ProtobufStreamProcessor protobufStreamProcessor,
-      ProtocolClientStatistics statistics, InternalLocator locator) {
-    this.streamProcessor = protobufStreamProcessor;
-    this.statistics = statistics;
-    this.locator = locator;
-    this.statistics.clientConnected();
-    this.locatorConnectionState = new NoSecurityConnectionStateProcessor();
-    this.validator = new VersionValidator();
-  }
-
-  @Override
-  public void processMessage(InputStream inputStream, OutputStream outputStream)
-      throws IOException, IncompatibleVersionException {
-    handleHandshakeMessage(inputStream);
-    streamProcessor.receiveMessage(inputStream, outputStream,
-        new MessageExecutionContext(locator, statistics, locatorConnectionState));
-  }
-
-  @Override
-  public void close() {
-    this.statistics.clientDisconnected();
-  }
-
-  @Override
-  public boolean socketProcessingIsFinished() {
-    // All locator connections are closed after one message, so this is not used
-    return false;
-  }
-
-  private void handleHandshakeMessage(InputStream inputStream) throws IOException {
-    // Incoming connection had the first byte removed to determine communication mode, add
that
-    // back before parsing.
-    PushbackInputStream handshakeStream = new PushbackInputStream(inputStream);
-    handshakeStream.unread(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-
-    ProtocolVersion.NewConnectionClientVersion handshakeRequest =
-        ProtocolVersion.NewConnectionClientVersion.parseDelimitedFrom(handshakeStream);
-    int majorVersion = handshakeRequest.getMajorVersion();
-    int minorVersion = handshakeRequest.getMinorVersion();
-    if (!validator.isValid(majorVersion, minorVersion)) {
-      throw new IOException(
-          "Invalid protobuf client version number: " + majorVersion + "." + minorVersion);
-    }
-  }
-}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufProtocolService.java
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufProtocolService.java
index 1794a5e..74bee72 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufProtocolService.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufProtocolService.java
@@ -19,8 +19,11 @@ import org.apache.geode.cache.Cache;
 import org.apache.geode.distributed.internal.InternalLocator;
 import org.apache.geode.internal.cache.client.protocol.ClientProtocolProcessor;
 import org.apache.geode.internal.cache.client.protocol.ClientProtocolService;
+import org.apache.geode.internal.protocol.LocatorMessageExecutionContext;
+import org.apache.geode.internal.protocol.ServerMessageExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.ProtocolVersion;
 import org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatisticsImpl;
+import org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionHandshakeStateProcessor;
 import org.apache.geode.internal.protocol.statistics.NoOpStatistics;
 import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics;
 import org.apache.geode.internal.security.SecurityService;
@@ -41,8 +44,10 @@ public class ProtobufProtocolService implements ClientProtocolService {
       SecurityService securityService) {
     assert (statistics != null);
 
-    return new ProtobufCachePipeline(protobufStreamProcessor, getStatistics(), cache,
-        securityService);
+    ProtobufConnectionHandshakeStateProcessor connectionStateProcessor =
+        new ProtobufConnectionHandshakeStateProcessor(securityService);
+    return new ProtobufCachePipeline(protobufStreamProcessor,
+        new ServerMessageExecutionContext(cache, statistics, connectionStateProcessor));
   }
 
   /**
@@ -57,8 +62,12 @@ public class ProtobufProtocolService implements ClientProtocolService {
   }
 
   @Override
-  public ClientProtocolProcessor createProcessorForLocator(InternalLocator locator) {
-    return new ProtobufLocatorPipeline(protobufStreamProcessor, getStatistics(), locator);
+  public ClientProtocolProcessor createProcessorForLocator(InternalLocator locator,
+      SecurityService securityService) {
+    ProtobufConnectionHandshakeStateProcessor connectionStateProcessor =
+        new ProtobufConnectionHandshakeStateProcessor(securityService);
+    return new ProtobufCachePipeline(protobufStreamProcessor,
+        new LocatorMessageExecutionContext(locator, statistics, connectionStateProcessor));
   }
 
   @Override
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetAvailableServersOperationHandler.java
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetAvailableServersOperationHandler.java
index c6f68c6..1979b70 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetAvailableServersOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetAvailableServersOperationHandler.java
@@ -32,6 +32,7 @@ import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
 import org.apache.geode.internal.protocol.protobuf.v1.LocatorAPI;
 import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
 import org.apache.geode.internal.protocol.serialization.SerializationService;
+import org.apache.geode.internal.protocol.state.ConnectionTerminatingStateProcessor;
 
 @Experimental
 public class GetAvailableServersOperationHandler implements
@@ -43,6 +44,7 @@ public class GetAvailableServersOperationHandler implements
       LocatorAPI.GetAvailableServersRequest request,
       MessageExecutionContext messageExecutionContext) throws InvalidExecutionContextException
{
 
+    messageExecutionContext.setConnectionStateProcessor(new ConnectionTerminatingStateProcessor());
     InternalLocator internalLocator = (InternalLocator) messageExecutionContext.getLocator();
     ArrayList serversFromSnapshot =
         internalLocator.getServerLocatorAdvisee().getLoadSnapshot().getServers(null);
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/TestExecutionContext.java
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/TestExecutionContext.java
index 94779da..4228049 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/TestExecutionContext.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/TestExecutionContext.java
@@ -21,12 +21,12 @@ import org.apache.geode.internal.protocol.statistics.NoOpStatistics;
 
 public class TestExecutionContext {
   public static MessageExecutionContext getNoAuthCacheExecutionContext(Cache cache) {
-    return new MessageExecutionContext(cache, new NoOpStatistics(),
+    return new ServerMessageExecutionContext(cache, new NoOpStatistics(),
         new NoSecurityConnectionStateProcessor());
   }
 
   public static MessageExecutionContext getLocatorExecutionContext(InternalLocator locator)
{
-    return new MessageExecutionContext(locator, new NoOpStatistics(),
+    return new LocatorMessageExecutionContext(locator, new NoOpStatistics(),
         new NoSecurityConnectionStateProcessor());
   }
 }
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/LocatorConnectionAuthenticationDUnitTest.java
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/LocatorConnectionAuthenticationDUnitTest.java
new file mode 100644
index 0000000..9dbe83a
--- /dev/null
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/LocatorConnectionAuthenticationDUnitTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.geode.internal.protocol.protobuf.v1.acceptance;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.List;
+import java.util.Properties;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.distributed.Locator;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.protocol.exception.InvalidProtocolMessageException;
+import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.Server;
+import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI;
+import org.apache.geode.internal.protocol.protobuf.v1.MessageUtil;
+import org.apache.geode.internal.protocol.protobuf.v1.serializer.ProtobufProtocolSerializer;
+import org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufRequestUtilities;
+import org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufUtilities;
+import org.apache.geode.security.SimpleTestSecurityManager;
+import org.apache.geode.test.dunit.DistributedTestUtils;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.junit.categories.DistributedTest;
+
+/**
+ * Test sending ProtoBuf messages to the locator, with a security manager configured on the
locator
+ */
+@Category(DistributedTest.class)
+public class LocatorConnectionAuthenticationDUnitTest extends JUnit4CacheTestCase {
+  @Rule
+  public final DistributedRestoreSystemProperties restoreSystemProperties =
+      new DistributedRestoreSystemProperties();
+  private int locatorPort;
+
+  @Before
+  public void setup() throws IOException {
+    // Start a new locator with authorization
+    locatorPort = Host.getHost(0).getVM(0).invoke(() -> {
+      System.setProperty("geode.feature-protobuf-protocol", "true");
+      Properties props = new Properties();
+      props.setProperty(ConfigurationProperties.SECURITY_MANAGER,
+          SimpleTestSecurityManager.class.getName());
+      Locator locator = Locator.startLocatorAndDS(0, null, props);
+      return locator.getPort();
+    });
+
+    startCacheWithCacheServer(locatorPort);
+  }
+
+  private Socket createSocket() throws IOException {
+    Host host = Host.getHost(0);
+    Socket socket = new Socket(host.getHostName(), locatorPort);
+    MessageUtil.sendHandshake(socket);
+    MessageUtil.verifyHandshakeSuccess(socket);
+    return socket;
+  }
+
+  /**
+   * Test that if the locator has a security manager, an authorized client is allowed to
get the
+   * available servers
+   */
+  @Test
+  public void authorizedClientCanGetServersIfSecurityIsEnabled() throws Throwable {
+    ClientProtocol.Request.Builder protobufRequestBuilder =
+        ProtobufUtilities.createProtobufRequestBuilder();
+
+    ClientProtocol.Message authorization =
+        ProtobufUtilities.createProtobufMessage(ProtobufUtilities.createProtobufRequestBuilder()
+            .setAuthenticationRequest(ConnectionAPI.AuthenticationRequest.newBuilder()
+                .putCredentials("security-username", "cluster").putCredentials("security-password",
+                    "cluster"))
+            .build());
+    ClientProtocol.Message getAvailableServersRequestMessage = ProtobufUtilities
+        .createProtobufMessage(protobufRequestBuilder.setGetAvailableServersRequest(
+            ProtobufRequestUtilities.createGetAvailableServersRequest()).build());
+
+    ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
+
+    try (Socket socket = createSocket()) {
+      protobufProtocolSerializer.serialize(authorization, socket.getOutputStream());
+
+      ClientProtocol.Message authorizationResponse =
+          protobufProtocolSerializer.deserialize(socket.getInputStream());
+      assertEquals(true,
+          authorizationResponse.getResponse().getAuthenticationResponse().getAuthenticated());
+      protobufProtocolSerializer.serialize(getAvailableServersRequestMessage,
+          socket.getOutputStream());
+
+      ClientProtocol.Message getAvailableServersResponseMessage =
+          protobufProtocolSerializer.deserialize(socket.getInputStream());
+      assertEquals("Got response: " + getAvailableServersResponseMessage, 1,
+          getAvailableServersResponseMessage.getResponse().getGetAvailableServersResponse()
+              .getServersCount());
+    }
+  }
+
+  /**
+   * Test that if the locator has a security manager, an unauthorized client is not allowed
to do
+   * anything.
+   */
+  @Test
+  public void unauthorizedClientCannotGetServersIfSecurityIsEnabled() throws Throwable {
+    ClientProtocol.Request.Builder protobufRequestBuilder =
+        ProtobufUtilities.createProtobufRequestBuilder();
+    ClientProtocol.Message getAvailableServersRequestMessage = ProtobufUtilities
+        .createProtobufMessage(protobufRequestBuilder.setGetAvailableServersRequest(
+            ProtobufRequestUtilities.createGetAvailableServersRequest()).build());
+
+    ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
+
+    try (Socket socket = createSocket()) {
+      protobufProtocolSerializer.serialize(getAvailableServersRequestMessage,
+          socket.getOutputStream());
+
+      ClientProtocol.Message getAvailableServersResponseMessage =
+          protobufProtocolSerializer.deserialize(socket.getInputStream());
+      assertNotNull("Got response: " + getAvailableServersResponseMessage,
+          getAvailableServersRequestMessage.getResponse().getErrorResponse());
+    }
+  }
+
+
+  private Integer startCacheWithCacheServer(int locatorPort) throws IOException {
+    System.setProperty("geode.feature-protobuf-protocol", "true");
+
+    Properties props = new Properties();
+    props.setProperty(ConfigurationProperties.LOCATORS, "localhost[" + locatorPort + "]");
+    props.setProperty("security-username", "cluster");
+    props.setProperty("security-password", "cluster");
+    props.setProperty(ConfigurationProperties.USE_CLUSTER_CONFIGURATION, "true");
+    InternalCache cache = getCache(props);
+    CacheServer cacheServer = cache.addCacheServer();
+    cacheServer.setPort(0);
+    cacheServer.start();
+    return cacheServer.getPort();
+  }
+}
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/LocatorConnectionDUnitTest.java
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/LocatorConnectionDUnitTest.java
index 84d0c61..451aca3 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/LocatorConnectionDUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/LocatorConnectionDUnitTest.java
@@ -75,6 +75,7 @@ public class LocatorConnectionDUnitTest extends JUnit4CacheTestCase {
     int locatorPort = DistributedTestUtils.getDUnitLocatorPort();
     Socket socket = new Socket(host.getHostName(), locatorPort);
     MessageUtil.sendHandshake(socket);
+    MessageUtil.verifyHandshakeSuccess(socket);
     return socket;
   }
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <commits@geode.apache.org>'].

Mime
View raw message