geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zho...@apache.org
Subject [34/46] geode git commit: GEODE-3075: Initial refactor adding NewProtocolServerConnection
Date Tue, 27 Jun 2017 05:00:49 GMT
GEODE-3075: Initial refactor adding NewProtocolServerConnection

subclassing `ServerConnection`.

The new code is broken but it won't be called under normal operation,
since it's gated on a system property, "geode.feature-protobuf-protocol"

Further refactoring and feature work to come.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/dfdde4af
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/dfdde4af
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/dfdde4af

Branch: refs/heads/feature/GEM-1483
Commit: dfdde4af352566d60f18b0db156fc5b7f48ebd8b
Parents: 49123c4
Author: Galen OSullivan <gosullivan@pivotal.io>
Authored: Mon Jun 12 09:33:35 2017 -0700
Committer: Hitesh Khamesra <hkhamesra@pivotal.io>
Committed: Mon Jun 26 09:26:22 2017 -0700

----------------------------------------------------------------------
 .../geode/internal/cache/tier/Acceptor.java     |  4 +
 .../cache/tier/sockets/AcceptorImpl.java        | 10 +--
 .../cache/tier/sockets/ClientHealthMonitor.java |  7 +-
 .../sockets/ClientProtocolMessageHandler.java   | 29 +++++++
 .../tier/sockets/LegacyServerConnection.java    | 88 ++++++++++++++++++++
 .../sockets/NewProtocolServerConnection.java    | 86 +++++++++++++++++++
 .../cache/tier/sockets/ServerConnection.java    | 75 +++++++----------
 .../tier/sockets/ServerConnectionFactory.java   | 50 +++++++++++
 .../sockets/ServerConnectionFactoryTest.java    | 77 +++++++++++++++++
 .../tier/sockets/ServerConnectionTest.java      |  7 +-
 10 files changed, 375 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/dfdde4af/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
index 9a3241b..a95195a 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
@@ -71,6 +71,10 @@ public abstract class Acceptor {
    */
   public static final byte CLIENT_TO_SERVER_FOR_QUEUE = (byte) 107;
 
+  /**
+   * For the new client-server protocol, which ignores the usual handshake mechanism.
+   */
+  public static final byte CLIENT_TO_SERVER_NEW_PROTOCOL = (byte) 110;
 
   /**
    * The GFE version of the server.

http://git-wip-us.apache.org/repos/asf/geode/blob/dfdde4af/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index 2a8818c..24efc93 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -1422,8 +1422,8 @@ public class AcceptorImpl extends Acceptor implements Runnable {
     s.setTcpNoDelay(this.tcpNoDelay);
 
     if (communicationMode == CLIENT_TO_SERVER || communicationMode == GATEWAY_TO_GATEWAY
-        || communicationMode == MONITOR_TO_SERVER
-        || communicationMode == CLIENT_TO_SERVER_FOR_QUEUE) {
+        || communicationMode == MONITOR_TO_SERVER || communicationMode == CLIENT_TO_SERVER_FOR_QUEUE
+        || communicationMode == CLIENT_TO_SERVER_NEW_PROTOCOL) {
       String communicationModeStr = "";
       switch (communicationMode) {
         case CLIENT_TO_SERVER:
@@ -1466,9 +1466,9 @@ public class AcceptorImpl extends Acceptor implements Runnable {
           return;
         }
       }
-      ServerConnection serverConn = new ServerConnection(s, this.cache, this.crHelper, this.stats,
-          AcceptorImpl.handShakeTimeout, this.socketBufferSize, communicationModeStr,
-          communicationMode, this, this.securityService);
+      ServerConnection serverConn = ServerConnectionFactory.makeServerConnection(s, this.cache,
+          this.crHelper, this.stats, AcceptorImpl.handShakeTimeout, this.socketBufferSize,
+          communicationModeStr, communicationMode, this, this.securityService);
       synchronized (this.allSCsLock) {
         this.allSCs.add(serverConn);
         ServerConnection snap[] = this.allSCList; // avoid volatile read

http://git-wip-us.apache.org/repos/asf/geode/blob/dfdde4af/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
index e0b5ab8..35cc33f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
@@ -478,12 +478,7 @@ public class ClientHealthMonitor {
           Iterator connectionsIterator = connections.iterator();
           while (connectionsIterator.hasNext()) {
             ServerConnection sc = (ServerConnection) connectionsIterator.next();
-            byte communicationMode = sc.getCommunicationMode();
-            /* Check for all modes that could be used for Client-Server communication */
-            if (communicationMode == Acceptor.CLIENT_TO_SERVER
-                || communicationMode == Acceptor.PRIMARY_SERVER_TO_CLIENT
-                || communicationMode == Acceptor.SECONDARY_SERVER_TO_CLIENT
-                || communicationMode == Acceptor.CLIENT_TO_SERVER_FOR_QUEUE) {
+            if (sc.isClientServerConnection()) {
               memberId = sc.getMembershipID(); // each ServerConnection has the same member
id
               cci.setMemberId(memberId);
               cci.setNumberOfConnections(connections.size());

http://git-wip-us.apache.org/repos/asf/geode/blob/dfdde4af/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
new file mode 100644
index 0000000..db42330
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache.tier.sockets;
+
+import org.apache.geode.internal.cache.InternalCache;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Stub, this will be hooked up to the new client protocol when it's implemented.
+ */
+public class ClientProtocolMessageHandler {
+  public void receiveMessage(InputStream inputStream, OutputStream outputStream,
+      InternalCache cache) {}
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/dfdde4af/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/LegacyServerConnection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/LegacyServerConnection.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/LegacyServerConnection.java
new file mode 100644
index 0000000..2b46eb3
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/LegacyServerConnection.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache.tier.sockets;
+
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.tier.Acceptor;
+import org.apache.geode.internal.cache.tier.CachedRegionHelper;
+import org.apache.geode.internal.i18n.LocalizedStrings;
+import org.apache.geode.internal.logging.log4j.LocalizedMessage;
+import org.apache.geode.internal.security.SecurityService;
+
+import java.io.IOException;
+import java.net.Socket;
+
+/**
+ * Handles everything but the new client protocol.
+ *
+ * Legacy is therefore a bit of a misnomer; do you have a better name?
+ */
+public class LegacyServerConnection extends ServerConnection {
+  /**
+   * Set to false once handshake has been done
+   */
+  private boolean doHandshake = true;
+
+  /**
+   * Creates a new <code>ServerConnection</code> that processes messages received
from an edge
+   * client over a given <code>Socket</code>.
+   *
+   * @param socket
+   * @param internalCache
+   * @param helper
+   * @param stats
+   * @param hsTimeout
+   * @param socketBufferSize
+   * @param communicationModeStr
+   * @param communicationMode
+   * @param acceptor
+   * @param securityService
+   */
+  public LegacyServerConnection(Socket socket, InternalCache internalCache,
+      CachedRegionHelper helper, CacheServerStats stats, int hsTimeout, int socketBufferSize,
+      String communicationModeStr, byte communicationMode, Acceptor acceptor,
+      SecurityService securityService) {
+    super(socket, internalCache, helper, stats, hsTimeout, socketBufferSize, communicationModeStr,
+        communicationMode, acceptor, securityService);
+  }
+
+  @Override
+  protected boolean doHandShake(byte epType, int qSize) {
+    try {
+      this.handshake.accept(theSocket.getOutputStream(), theSocket.getInputStream(), epType,
qSize,
+          this.communicationMode, this.principal);
+    } catch (IOException ioe) {
+      if (!crHelper.isShutdown() && !isTerminated()) {
+        logger.warn(LocalizedMessage.create(
+            LocalizedStrings.ServerConnection_0_HANDSHAKE_ACCEPT_FAILED_ON_SOCKET_1_2,
+            new Object[] {this.name, this.theSocket, ioe}));
+      }
+      cleanup();
+      return false;
+    }
+    return true;
+  }
+
+  protected void doOneMessage() {
+    if (this.doHandshake) {
+      doHandshake();
+      this.doHandshake = false;
+    } else {
+      this.resetTransientData();
+      doNormalMsg();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/dfdde4af/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NewProtocolServerConnection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NewProtocolServerConnection.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NewProtocolServerConnection.java
new file mode 100644
index 0000000..a78cd1c
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NewProtocolServerConnection.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache.tier.sockets;
+
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.tier.Acceptor;
+import org.apache.geode.internal.cache.tier.CachedRegionHelper;
+import org.apache.geode.internal.security.SecurityService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+
+/**
+ * Holds the socket and protocol handler for the new client protocol. TODO: Currently unimplemented
+ * due the the protocol not being there.
+ */
+public class NewProtocolServerConnection extends ServerConnection {
+  // The new protocol lives in a separate module and gets loaded when this class is instantiated.
+  // TODO implement this.
+  private final ClientProtocolMessageHandler newClientProtocol;
+
+
+  /**
+   * Creates a new <code>NewProtocolServerConnection</code> that processes messages
received from an
+   * edge client over a given <code>Socket</code>.
+   *
+   * @param s
+   * @param c
+   * @param helper
+   * @param stats
+   * @param hsTimeout
+   * @param socketBufferSize
+   * @param communicationModeStr
+   * @param communicationMode
+   * @param acceptor
+   */
+  public NewProtocolServerConnection(Socket s, InternalCache c, CachedRegionHelper helper,
+                                     CacheServerStats stats, int hsTimeout, int socketBufferSize,
String communicationModeStr,
+                                     byte communicationMode, Acceptor acceptor, ClientProtocolMessageHandler
newClientProtocol,
+                                     SecurityService securityService) {
+    super(s, c, helper, stats, hsTimeout, socketBufferSize, communicationModeStr, communicationMode,
+        acceptor, securityService);
+    assert (communicationMode == AcceptorImpl.CLIENT_TO_SERVER_NEW_PROTOCOL);
+    this.newClientProtocol = newClientProtocol;
+  }
+
+  @Override
+  protected void doOneMessage() {
+    try {
+      Socket socket = this.getSocket();
+      InputStream inputStream = socket.getInputStream();
+      OutputStream outputStream = socket.getOutputStream();
+      // TODO serialization types?
+      newClientProtocol.receiveMessage(inputStream, outputStream, this.getCache());
+    } catch (IOException e) {
+      // TODO?
+    }
+    return;
+  }
+
+  @Override
+  protected boolean doHandShake(byte epType, int qSize) {
+    // no handshake for new client protocol.
+    return true;
+  }
+
+  @Override
+  public boolean isClientServerConnection() {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/dfdde4af/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 947b836..ebc9dab 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -74,9 +74,9 @@ import org.apache.geode.security.GemFireSecurityException;
  *
  * @since GemFire 2.0.2
  */
-public class ServerConnection implements Runnable {
+public abstract class ServerConnection implements Runnable {
 
-  private static final Logger logger = LogService.getLogger();
+  protected static final Logger logger = LogService.getLogger();
 
   /**
    * This is a buffer that we add to client readTimeout value before we cleanup the connection.
This
@@ -138,12 +138,12 @@ public class ServerConnection implements Runnable {
     }
   }
 
-  private Socket theSocket;
+  protected Socket theSocket;
   // private InputStream in = null;
   // private OutputStream out = null;
   private ByteBuffer commBuffer;
-  private final CachedRegionHelper crHelper;
-  private String name = null;
+  protected final CachedRegionHelper crHelper;
+  protected String name = null;
 
   // IMPORTANT: if new messages are added change setHandshake to initialize them
   // to the correct Version for serializing to the client
@@ -168,7 +168,7 @@ public class ServerConnection implements Runnable {
   /**
    * Handshake reference uniquely identifying a client
    */
-  private ClientHandShake handshake;
+  protected ClientHandShake handshake;
   private int handShakeTimeout;
   private final Object handShakeMonitor = new Object();
 
@@ -213,7 +213,7 @@ public class ServerConnection implements Runnable {
    * The communication mode for this <code>ServerConnection</code>. Valid types
include
    * 'client-server', 'gateway-gateway' and 'monitor-server'.
    */
-  private final byte communicationMode;
+  protected final byte communicationMode;
   private final String communicationModeStr;
 
   private long processingMessageStartTime = -1;
@@ -233,7 +233,7 @@ public class ServerConnection implements Runnable {
 
   private Part securePart = null;
 
-  private Principal principal;
+  protected Principal principal;
 
   private MessageIdExtractor messageIdExtractor = new MessageIdExtractor();
 
@@ -592,19 +592,14 @@ public class ServerConnection implements Runnable {
     }
   }
 
-  private boolean acceptHandShake(byte epType, int qSize) {
-    try {
-      this.handshake.accept(theSocket.getOutputStream(), theSocket.getInputStream(), epType,
qSize,
-          this.communicationMode, this.principal);
-    } catch (IOException ioe) {
-      if (!crHelper.isShutdown() && !isTerminated()) {
-        logger.warn(LocalizedMessage.create(
-            LocalizedStrings.ServerConnection_0_HANDSHAKE_ACCEPT_FAILED_ON_SOCKET_1_2,
-            new Object[] {this.name, this.theSocket, ioe}));
-      }
-      cleanup();
-      return false;
-    }
+  protected boolean acceptHandShake(byte epType, int qSize) {
+    return doHandShake(epType, qSize) && handshakeAccepted();
+  }
+
+  protected abstract boolean doHandShake(byte epType, int qSize);
+
+
+  protected boolean handshakeAccepted() {
     if (logger.isDebugEnabled()) {
       logger.debug("{}: Accepted handshake", this.name);
     }
@@ -670,6 +665,16 @@ public class ServerConnection implements Runnable {
     }
   }
 
+  /**
+   * @return whether this is a connection to a client, regardless of protocol.
+   */
+  public boolean isClientServerConnection() {
+    return communicationMode == Acceptor.CLIENT_TO_SERVER
+        || communicationMode == Acceptor.PRIMARY_SERVER_TO_CLIENT
+        || communicationMode == Acceptor.SECONDARY_SERVER_TO_CLIENT
+        || communicationMode == Acceptor.CLIENT_TO_SERVER_FOR_QUEUE;
+  }
+
   static class Counter {
     int cnt;
 
@@ -686,22 +691,12 @@ public class ServerConnection implements Runnable {
     }
   }
 
-  // public void setUserAuthAttributes(ClientProxyMembershipID proxyId, AuthorizeRequest
-  // authzRequest, AuthorizeRequestPP postAuthzRequest) {
-  // UserAuthAttributes uaa = new UserAuthAttributes(authzRequest, postAuthzRequest);
-  // }
-
-  /**
-   * Set to false once handshake has been done
-   */
-  private boolean doHandshake = true;
-
   private boolean clientDisconnectedCleanly = false;
   private Throwable clientDisconnectedException;
   private int failureCount = 0;
   private boolean processMessages = true;
 
-  private void doHandshake() {
+  protected void doHandshake() {
     // hitesh:to create new connection handshake
     if (verifyClientConnection()) {
       // Initialize the commands after the handshake so that the version
@@ -718,7 +713,7 @@ public class ServerConnection implements Runnable {
     }
   }
 
-  private void doNormalMsg() {
+  protected void doNormalMsg() {
     Message msg = null;
     msg = BaseCommand.readRequest(this);
     ThreadState threadState = null;
@@ -903,15 +898,7 @@ public class ServerConnection implements Runnable {
     }
   }
 
-  private void doOneMessage() {
-    if (this.doHandshake) {
-      doHandshake();
-      this.doHandshake = false;
-    } else {
-      this.resetTransientData();
-      doNormalMsg();
-    }
-  }
+  protected abstract void doOneMessage();
 
   private void initializeClientUserAuths() {
     this.clientUserAuths = getClientUserAuths(this.proxyId);
@@ -1070,7 +1057,7 @@ public class ServerConnection implements Runnable {
   /**
    * MessageType of the messages (typically internal commands) which do not need to participate
in
    * security should be added in the following if block.
-   * 
+   *
    * @return Part
    * @see AbstractOp#processSecureBytes(Connection, Message)
    * @see AbstractOp#needsUserId()
@@ -1495,7 +1482,7 @@ public class ServerConnection implements Runnable {
 
   /**
    * Just ensure that this class gets loaded.
-   * 
+   *
    * @see SystemFailure#loadEmergencyClasses()
    */
   public static void loadEmergencyClasses() {

http://git-wip-us.apache.org/repos/asf/geode/blob/dfdde4af/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
new file mode 100644
index 0000000..4f2e304
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache.tier.sockets;
+
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.tier.Acceptor;
+import org.apache.geode.internal.cache.tier.CachedRegionHelper;
+import org.apache.geode.internal.security.SecurityService;
+
+import java.io.IOException;
+import java.net.Socket;
+
+/**
+ * Creates instances of ServerConnection based on the connection mode provided.
+ */
+public class ServerConnectionFactory {
+  // TODO: implement ClientProtocolMessageHandler.
+  private static final ClientProtocolMessageHandler newClientProtocol =
+      new ClientProtocolMessageHandler();
+
+  public static ServerConnection makeServerConnection(Socket s, InternalCache c,
+      CachedRegionHelper helper, CacheServerStats stats, int hsTimeout, int socketBufferSize,
+      String communicationModeStr, byte communicationMode, Acceptor acceptor,
+      SecurityService securityService) throws IOException {
+    if (communicationMode == Acceptor.CLIENT_TO_SERVER_NEW_PROTOCOL) {
+      if (!Boolean.getBoolean("geode.feature-protobuf-protocol")) {
+        throw new IOException("Acceptor received unknown communication mode: " + communicationMode);
+      } else {
+        return new NewProtocolServerConnection(s, c, helper, stats, hsTimeout, socketBufferSize,
+            communicationModeStr, communicationMode, acceptor, newClientProtocol, securityService);
+      }
+    } else {
+      return new LegacyServerConnection(s, c, helper, stats, hsTimeout, socketBufferSize,
+          communicationModeStr, communicationMode, acceptor, securityService);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/dfdde4af/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
new file mode 100644
index 0000000..c314944
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache.tier.sockets;
+
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.tier.Acceptor;
+import org.apache.geode.internal.cache.tier.CachedRegionHelper;
+import org.apache.geode.internal.security.SecurityService;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+public class ServerConnectionFactoryTest {
+  /**
+   * Safeguard that we won't create the new client protocol object unless the feature flag
is enabled.
+   */
+  @Test(expected = IOException.class)
+  public void newClientProtocolThrows() throws Exception {
+    serverConnectionMockedExceptForCommunicationMode(Acceptor.CLIENT_TO_SERVER_NEW_PROTOCOL);
+  }
+
+  @Test
+  public void newClientProtocolSucceedsWithSystemPropertySet() throws Exception {
+    System.setProperty("geode.feature-protobuf-protocol", "true");
+    ServerConnection serverConnection = serverConnectionMockedExceptForCommunicationMode(Acceptor.CLIENT_TO_SERVER_NEW_PROTOCOL);
+    assertTrue(serverConnection instanceof NewProtocolServerConnection);
+    System.clearProperty("geode.feature-protobuf-protocol");
+  }
+
+  @Test
+  public void makeServerConnection() throws Exception {
+    byte[] communicationModes = new byte[]{
+      Acceptor.CLIENT_TO_SERVER,
+      Acceptor.PRIMARY_SERVER_TO_CLIENT,
+      Acceptor.SECONDARY_SERVER_TO_CLIENT,
+      Acceptor.GATEWAY_TO_GATEWAY,
+      Acceptor.MONITOR_TO_SERVER,
+      Acceptor.SUCCESSFUL_SERVER_TO_CLIENT,
+      Acceptor.UNSUCCESSFUL_SERVER_TO_CLIENT,
+      Acceptor.CLIENT_TO_SERVER_FOR_QUEUE,
+    };
+
+    for (byte communicationMode : communicationModes) {
+      ServerConnection serverConnection = serverConnectionMockedExceptForCommunicationMode(communicationMode);
+      assertTrue(serverConnection instanceof LegacyServerConnection);
+    }
+  }
+
+  private static ServerConnection serverConnectionMockedExceptForCommunicationMode(byte communicationMode)
throws IOException {
+    Socket socketMock = mock(Socket.class);
+    when(socketMock.getInetAddress()).thenReturn(InetAddress.getByName("localhost"));
+
+    return ServerConnectionFactory.makeServerConnection(
+      socketMock, mock(InternalCache.class), mock(CachedRegionHelper.class),
+      mock(CacheServerStats.class), 0, 0, "",
+      communicationMode, mock(AcceptorImpl.class), mock(SecurityService.class));
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/geode/blob/dfdde4af/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
index 794c610..d3ef21f 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
@@ -39,6 +39,7 @@ import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.Socket;
 
@@ -57,7 +58,7 @@ public class ServerConnectionTest {
   private ServerConnection serverConnection;
 
   @Before
-  public void setUp() {
+  public void setUp() throws IOException {
     AcceptorImpl acceptor = mock(AcceptorImpl.class);
 
     InetAddress inetAddress = mock(InetAddress.class);
@@ -69,8 +70,8 @@ public class ServerConnectionTest {
     InternalCache cache = mock(InternalCache.class);
     SecurityService securityService = mock(SecurityService.class);
 
-    serverConnection = new ServerConnection(socket, cache, null, null, 0, 0, null,
-        Acceptor.PRIMARY_SERVER_TO_CLIENT, acceptor, securityService);
+    serverConnection = ServerConnectionFactory.makeServerConnection(socket, cache, null,
null, 0, 0,
+        null, Acceptor.PRIMARY_SERVER_TO_CLIENT, acceptor, securityService);
     MockitoAnnotations.initMocks(this);
   }
 


Mime
View raw message