geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hiteshkhame...@apache.org
Subject geode git commit: Added support to bypass handshake mechanism. Now client can connect to server by sending byte 110. NOTE: I have modified current gemfire client to send byte 110 to bypass handshake. Ran PdxClientServerDunittest to see if this works
Date Mon, 17 Apr 2017 22:32:07 GMT
Repository: geode
Updated Branches:
  refs/heads/feature/GEODE-2580 [created] 0a4cfab6f


Added support to bypass handshake mechanism. Now client can connect
to server by sending byte 110. NOTE: I have modified current gemfire
client to send byte 110 to bypass handshake. Ran
PdxClientServerDunittest to see if this works


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/0a4cfab6
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/0a4cfab6
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/0a4cfab6

Branch: refs/heads/feature/GEODE-2580
Commit: 0a4cfab6f0629abd97c3b9e32664fee1e767cadc
Parents: 1156011
Author: Hitesh Khamesra <hkhamesra@pivotal.io>
Authored: Mon Apr 17 15:27:20 2017 -0700
Committer: Hitesh Khamesra <hkhamesra@pivotal.io>
Committed: Mon Apr 17 15:27:20 2017 -0700

----------------------------------------------------------------------
 .../client/internal/ConnectionFactoryImpl.java  |  3 ++-
 .../cache/client/internal/ConnectionImpl.java   | 14 ++++++++++++-
 .../geode/internal/cache/tier/Acceptor.java     |  5 +++++
 .../cache/tier/sockets/AcceptorImpl.java        |  6 +++++-
 .../internal/cache/tier/sockets/HandShake.java  | 20 +++++++++++++++++++
 .../cache/tier/sockets/ServerConnection.java    | 21 ++++++++++++++++++--
 .../geode/pdx/PdxClientServerDUnitTest.java     |  5 ++++-
 7 files changed, 68 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/0a4cfab6/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java
index a419d57..93b903c 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java
@@ -116,7 +116,8 @@ public class ConnectionFactoryImpl implements ConnectionFactory {
     } else if (forQueue) {
       return Acceptor.CLIENT_TO_SERVER_FOR_QUEUE;
     } else {
-      return Acceptor.CLIENT_TO_SERVER;
+      //return Acceptor.CLIENT_TO_SERVER;
+      return Acceptor.CLIENT_TO_SERVER_NEW_PROTOCOL;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/0a4cfab6/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java
index a494138..816a07e 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java
@@ -17,6 +17,7 @@ package org.apache.geode.cache.client.internal;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketException;
 import java.nio.ByteBuffer;
@@ -31,10 +32,13 @@ import org.apache.geode.cache.client.internal.ExecuteFunctionOp.ExecuteFunctionO
 import org.apache.geode.cache.client.internal.ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl;
 import org.apache.geode.cache.client.internal.ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl;
 import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.net.SocketCreator;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
 import org.apache.geode.internal.cache.tier.sockets.HandShake;
 import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
 import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus;
@@ -103,7 +107,15 @@ public class ConnectionImpl implements Connection {
     theSocket.setSoTimeout(handShakeTimeout);
     out = theSocket.getOutputStream();
     in = theSocket.getInputStream();
-    this.status = handShake.handshakeWithServer(this, location, communicationMode);
+    
+    if(communicationMode == AcceptorImpl.CLIENT_TO_SERVER_NEW_PROTOCOL) {
+      handShake.writeNewProtcolVersionForServer(this, communicationMode);
+      InetSocketAddress remoteAddr = (InetSocketAddress)theSocket.getRemoteSocketAddress();
+      DistributedMember distributedMember = new InternalDistributedMember(remoteAddr.getAddress(),
remoteAddr.getPort());
+      this.status = new ServerQueueStatus(distributedMember);
+    } else {
+      this.status = handShake.handshakeWithServer(this, location, communicationMode);
+    }
     commBuffer = ServerConnection.allocateCommBuffer(socketBufferSize, theSocket);
     if (sender != null) {
       commBufferForAsyncRead = ServerConnection.allocateCommBuffer(socketBufferSize, theSocket);

http://git-wip-us.apache.org/repos/asf/geode/blob/0a4cfab6/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
index 9a3241b..4b774c6 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
@@ -70,6 +70,11 @@ public abstract class Acceptor {
    * queue (register interest, create cq, etc.).
    */
   public static final byte CLIENT_TO_SERVER_FOR_QUEUE = (byte) 107;
+  
+  /**
+   * For new client-server protocol which ignores current handshake mechanism
+   */
+  public static final byte CLIENT_TO_SERVER_NEW_PROTOCOL = (byte) 110;
 
 
   /**

http://git-wip-us.apache.org/repos/asf/geode/blob/0a4cfab6/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index ed29472..ef8c9d9 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -1422,7 +1422,8 @@ public class AcceptorImpl extends Acceptor implements Runnable {
 
     if (communicationMode == CLIENT_TO_SERVER || communicationMode == GATEWAY_TO_GATEWAY
         || communicationMode == MONITOR_TO_SERVER
-        || communicationMode == CLIENT_TO_SERVER_FOR_QUEUE) {
+        || communicationMode == CLIENT_TO_SERVER_FOR_QUEUE
+        || communicationMode == CLIENT_TO_SERVER_NEW_PROTOCOL) {
       String communicationModeStr = "";
       switch (communicationMode) {
         case CLIENT_TO_SERVER:
@@ -1437,6 +1438,9 @@ public class AcceptorImpl extends Acceptor implements Runnable {
         case CLIENT_TO_SERVER_FOR_QUEUE:
           communicationModeStr = "clientToServerForQueue";
           break;
+        case CLIENT_TO_SERVER_NEW_PROTOCOL:
+          communicationModeStr = "clientToServerForNewProtocol";
+          break;
       }
       if (logger.isDebugEnabled()) {
         logger.debug("Bridge server: Initializing {} communication socket: {}",

http://git-wip-us.apache.org/repos/asf/geode/blob/0a4cfab6/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
index 6e119c0..eb0c1ab 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
@@ -335,6 +335,14 @@ public class HandShake implements ClientHandShake {
     setOverrides();
     this.credentials = null;
   }
+  
+  public HandShake(ClientProxyMembershipID id, DistributedSystem sys, Version v) {
+    this.id = id;
+    this.code = REPLY_OK;
+    this.system = sys;
+    this.credentials = null;
+    this.clientVersion = v;
+  }
 
   public void updateProxyID(InternalDistributedMember idm) {
     this.id.updateID(idm);
@@ -1176,6 +1184,18 @@ public class HandShake implements ClientHandShake {
     return new InternalDistributedMember(sock.getInetAddress(), sock.getPort(), false);
   }
 
+  public void writeNewProtcolVersionForServer(Connection conn, byte communicationMode) throws
IOException {
+    Socket sock = conn.getSocket();
+    try {
+      DataOutputStream dos = new DataOutputStream(sock.getOutputStream());
+      dos.writeByte(communicationMode);
+    } catch (IOException ex) {
+      CancelCriterion stopper = this.system.getCancelCriterion();
+      stopper.checkCancelInProgress(null);
+      throw ex;
+    }
+  }
+  
   /**
    * Client-side handshake with a Server
    */

http://git-wip-us.apache.org/repos/asf/geode/blob/0a4cfab6/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index ecd9c7a..c1c1d1c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -21,6 +21,7 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectableChannel;
@@ -43,6 +44,7 @@ import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.client.internal.AbstractOp;
 import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
@@ -320,11 +322,24 @@ public class ServerConnection implements Runnable {
     return executeFunctionOnLocalNodeOnly.get();
   }
 
+  private boolean createClientHandshake() {
+    logger.info("createClientHandshake this.getCommunicationMode() " + this.getCommunicationMode());
+    if( this.getCommunicationMode() != AcceptorImpl.CLIENT_TO_SERVER_NEW_PROTOCOL )  {
+      return ServerHandShakeProcessor.readHandShake(this);
+    } else {
+      InetSocketAddress remoteAddress = (InetSocketAddress)theSocket.getRemoteSocketAddress();
+      DistributedMember member = new InternalDistributedMember(remoteAddress.getAddress(),
remoteAddress.getPort()); 
+      this.proxyId = new ClientProxyMembershipID(member);
+      this.handshake = new HandShake(this.proxyId, this.getDistributedSystem(), Version.CURRENT);
+      return true;
+    }
+  }
+  
   private boolean verifyClientConnection() {
     synchronized (this.handShakeMonitor) {
       if (this.handshake == null) {
         // synchronized (getCleanupTable()) {
-        boolean readHandShake = ServerHandShakeProcessor.readHandShake(this);
+        boolean readHandShake = createClientHandshake();
         if (readHandShake) {
           if (this.handshake.isOK()) {
             try {
@@ -594,8 +609,10 @@ public class ServerConnection implements Runnable {
 
   private boolean acceptHandShake(byte epType, int qSize) {
     try {
-      this.handshake.accept(theSocket.getOutputStream(), theSocket.getInputStream(), epType,
qSize,
+      if(this.communicationMode != AcceptorImpl.CLIENT_TO_SERVER_NEW_PROTOCOL) {
+        this.handshake.accept(theSocket.getOutputStream(), theSocket.getInputStream(), epType,
qSize,
           this.communicationMode, this.principal);
+      }
     } catch (IOException ioe) {
       if (!crHelper.isShutdown() && !isTerminated()) {
         logger.warn(LocalizedMessage.create(

http://git-wip-us.apache.org/repos/asf/geode/blob/0a4cfab6/geode-core/src/test/java/org/apache/geode/pdx/PdxClientServerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/pdx/PdxClientServerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/pdx/PdxClientServerDUnitTest.java
index 6fcdf4c..16571c2 100644
--- a/geode-core/src/test/java/org/apache/geode/pdx/PdxClientServerDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/pdx/PdxClientServerDUnitTest.java
@@ -593,7 +593,10 @@ public class PdxClientServerDUnitTest extends JUnit4CacheTestCase {
   }
 
   private int createServerRegion(final Class constraintClass) throws IOException {
-    CacheFactory cf = new CacheFactory(getDistributedSystemProperties());
+    Properties p = getDistributedSystemProperties();
+    p.put("log-level", "debug");
+    CacheFactory cf = new CacheFactory();
+    
     Cache cache = getCache(cf);
     RegionFactory rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
     rf.setValueConstraint(constraintClass);


Mime
View raw message