geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [67/69] [abbrv] geode git commit: GEODE-2582: New client can send a Put request with Protobuf IDL.
Date Mon, 22 May 2017 18:30:03 GMT
GEODE-2582: New client can send a Put request with Protobuf IDL.

* Update the proto IDL
* Get a put request working.

Currently this only uses String serialization; serialization protocols
still have to be implemented.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/511e2a35
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/511e2a35
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/511e2a35

Branch: refs/heads/feature/GEODE-2580
Commit: 511e2a351d4be68b7741727abc34d5147584dff7
Parents: 64798a5
Author: Galen OSullivan <gosullivan@pivotal.io>
Authored: Thu Mar 16 16:26:03 2017 -0700
Committer: Udo Kohlmeyer <ukohlmeyer@pivotal.io>
Committed: Mon May 22 11:27:03 2017 -0700

----------------------------------------------------------------------
 geode-assembly/build.gradle                     |   9 +-
 geode-client-protobuf/build.gradle              |  88 ++++++++++
 .../client/NewClientProtocolTestClient.java     | 103 +++++++++++
 .../client/ProtobufProtocolMessageHandler.java  | 175 +++++++++++++++++++
 .../src/main/proto/basicTypes.proto             |  45 +++++
 .../src/main/proto/clientProtocol.proto         |  64 +++++++
 .../src/main/proto/region_API.proto             |  63 +++++++
 ...he.tier.sockets.ClientProtocolMessageHandler |   1 +
 .../geode/protocol/client/MessageUtils.java     |  92 ++++++++++
 .../client/ProtobufProtocolIntegrationTest.java |  85 +++++++++
 ...rotobufSerializationDeserializationTest.java | 101 +++++++++++
 .../client/internal/ConnectionFactoryImpl.java  |   3 +-
 .../cache/client/internal/ConnectionImpl.java   |  17 +-
 .../geode/internal/cache/tier/Acceptor.java     |   5 +
 .../cache/tier/sockets/AcceptorImpl.java        |   7 +-
 .../sockets/ClientProtocolMessageHandler.java   |  28 +++
 .../internal/cache/tier/sockets/HandShake.java  |  21 +++
 .../cache/tier/sockets/ServerConnection.java    |  85 +++++++--
 .../geode/serialization/Deserializer.java       |  20 +++
 .../serialization/FunctionalSerializer.java     |  36 ++++
 .../geode/serialization/SerializationType.java  |  36 ++++
 .../apache/geode/serialization/Serializer.java  |  20 +++
 .../geode/pdx/PdxClientServerDUnitTest.java     |   5 +-
 gradle/rat.gradle                               |   6 +-
 settings.gradle                                 |   1 +
 25 files changed, 1094 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-assembly/build.gradle
----------------------------------------------------------------------
diff --git a/geode-assembly/build.gradle b/geode-assembly/build.gradle
index a4f0c69..6c626a0 100755
--- a/geode-assembly/build.gradle
+++ b/geode-assembly/build.gradle
@@ -61,6 +61,7 @@ dependencies {
   archives project(':geode-wan')
   archives project(':geode-cq')
   archives project(':geode-rebalancer')
+  archives project(':geode-client-protobuf')
 
   testCompile project(':geode-junit')
   testCompile project(':geode-pulse')
@@ -176,7 +177,10 @@ def cp = {
         it.contains('lucene-analyzers-common') ||
         it.contains('lucene-core') ||
         it.contains('lucene-queries') ||
-        it.contains('lucene-queryparser')
+        it.contains('lucene-queryparser') ||
+
+        // dependencies from geode-client-protobuf
+        it.contains('protobuf-java')
       }
     }
   }.flatten().unique().join(' ')
@@ -335,6 +339,9 @@ distributions {
 
         from project(":geode-lucene").configurations.runtime
         from project(":geode-lucene").configurations.archives.allArtifacts.files
+
+        from project(":geode-client-protobuf").configurations.runtime
+        from project(":geode-client-protobuf").configurations.archives.allArtifacts.files
         
         from project(":geode-old-client-support").configurations.runtime
         from project(":geode-old-client-support").configurations.archives.allArtifacts.files

http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-client-protobuf/build.gradle
----------------------------------------------------------------------
diff --git a/geode-client-protobuf/build.gradle b/geode-client-protobuf/build.gradle
new file mode 100644
index 0000000..c82f311
--- /dev/null
+++ b/geode-client-protobuf/build.gradle
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+apply plugin: 'java'
+apply plugin: 'com.google.protobuf'
+apply plugin: 'idea'
+
+repositories {
+  maven { url "https://plugins.gradle.org/m2/" }
+}
+
+buildscript {
+  repositories {
+    maven { url "https://plugins.gradle.org/m2/" }
+  }
+  dependencies {
+    classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.1'
+  }
+}
+
+dependencies {
+  compile 'com.google.protobuf:protobuf-java:3.0.0'
+  //compile 'io.grpc:grpc-stub:1.0.0-pre2'
+  //compile 'io.grpc:grpc-protobuf:1.0.0-pre2'
+  // Extra proto source files besides the ones residing under
+  // "src/main".
+  protobuf files("main/proto/")
+  protobuf fileTree("ext/")
+
+  provided project(':geode-core')
+
+  testCompile 'junit:junit:4.12'
+  testCompile project (':geode-junit')
+  testCompile "org.mockito:mockito-core:2.+"
+  testCompile files(project(':geode-core').sourceSets.test.output)
+  // Extra proto source files for test besides the ones residing under
+  // "src/test".
+  //testProtobuf files("lib/protos-test.tar.gz")
+}
+
+protobuf {
+  protoc {
+    // The artifact spec for the Protobuf Compiler
+    artifact = 'com.google.protobuf:protoc:3.0.0'
+  }
+  // this allows our spotless rule to skip this directory (hopefully rat too)
+  generatedFilesBaseDir = "$buildDir/generated-src/proto"
+
+//  plugins {
+//    // Optional: an artifact spec for a protoc plugin, with "grpc" as
+//    // the identifier, which can be referred to in the "plugins"
+//    // container of the "generateProtoTasks" closure.
+//    grpc {
+//      artifact = 'io.grpc:protoc-gen-grpc-java:1.0.0-pre2'
+//    }
+//  }
+//  generateProtoTasks {
+//    ofSourceSet('main')*.plugins {
+//      // Apply the "grpc" plugin whose spec is defined above, without
+//      // options.  Note the braces cannot be omitted, otherwise the
+//      // plugin will not be added. This is because of the implicit way
+//      // NamedDomainObjectContainer binds the methods.
+//      grpc { }
+//    }
+//  }
+}
+
+// let IntelliJ know where the generated sources are.
+idea {
+  module {
+    sourceDirs += file("${protobuf.generatedFilesBaseDir}/main/java")
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/NewClientProtocolTestClient.java
----------------------------------------------------------------------
diff --git a/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/NewClientProtocolTestClient.java b/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/NewClientProtocolTestClient.java
new file mode 100644
index 0000000..a4476e1
--- /dev/null
+++ b/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/NewClientProtocolTestClient.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.protocol.client;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
+import org.apache.geode.protocol.protobuf.BasicTypes;
+import org.apache.geode.protocol.protobuf.ClientProtocol;
+import org.apache.geode.protocol.protobuf.ClientProtocol.Message;
+import org.apache.geode.protocol.protobuf.RegionAPI;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.Random;
+
+
+public class NewClientProtocolTestClient implements AutoCloseable {
+  private static SocketChannel socketChannel;
+  private final OutputStream outputStream;
+  private final InputStream inputStream;
+
+  public NewClientProtocolTestClient(String hostname, int port) throws IOException {
+    socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port));
+    inputStream = socketChannel.socket().getInputStream();
+    outputStream = socketChannel.socket().getOutputStream();
+  }
+
+  @Override
+  public void close() throws IOException {
+    socketChannel.close();
+  }
+
+  private static void sendHeader(OutputStream outputStream) throws IOException {
+    outputStream.write(AcceptorImpl.CLIENT_TO_SERVER_NEW_PROTOCOL);
+  }
+
+  public Message blockingSendMessage(Message message) throws IOException {
+    sendHeader(outputStream);
+
+    message.writeDelimitedTo(outputStream);
+    outputStream.flush();
+
+    return ClientProtocol.Message.parseDelimitedFrom(inputStream);
+  }
+
+  void parseResponse(Message response) {
+    System.out.println("response = " + response.toString());
+  }
+
+  private Message generateMessage() {
+    Random random = new Random();
+    ClientProtocol.MessageHeader.Builder messageHeader =
+        ClientProtocol.MessageHeader.newBuilder().setCorrelationId(random.nextInt());
+    // .setSize() //we don't need to set the size because Protobuf will handle the message frame
+
+    BasicTypes.Key.Builder key =
+        BasicTypes.Key.newBuilder().setKey(ByteString.copyFrom(createByteArrayOfSize(64)));
+
+    BasicTypes.Value.Builder value =
+        BasicTypes.Value.newBuilder().setValue(ByteString.copyFrom(createByteArrayOfSize(512)));
+
+    RegionAPI.PutRequest.Builder putRequestBuilder =
+        RegionAPI.PutRequest.newBuilder().setRegionName("TestRegion")
+            .setEntry(BasicTypes.Entry.newBuilder().setKey(key).setValue(value));
+
+    ClientProtocol.Request.Builder request =
+        ClientProtocol.Request.newBuilder().setPutRequest(putRequestBuilder);
+
+    Message.Builder message =
+        Message.newBuilder().setMessageHeader(messageHeader).setRequest(request);
+
+    return message.build();
+  }
+
+  private static byte[] createByteArrayOfSize(int msgSize) {
+    byte[] array = new byte[msgSize];
+    for (int i = 0; i < msgSize; i++) {
+      array[i] = 'a';
+    }
+    return array;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/ProtobufProtocolMessageHandler.java
----------------------------------------------------------------------
diff --git a/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/ProtobufProtocolMessageHandler.java b/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/ProtobufProtocolMessageHandler.java
new file mode 100644
index 0000000..a6993c4
--- /dev/null
+++ b/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/ProtobufProtocolMessageHandler.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.protocol.client;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Parser;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.TimeoutException;
+import org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.protocol.protobuf.BasicTypes;
+import org.apache.geode.protocol.protobuf.RegionAPI.GetRequest;
+import org.apache.geode.protocol.protobuf.RegionAPI.GetResponse;
+import org.apache.geode.protocol.protobuf.RegionAPI.PutResponse;
+import org.apache.geode.serialization.Deserializer;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import static org.apache.geode.protocol.protobuf.ClientProtocol.Message;
+import static org.apache.geode.protocol.protobuf.ClientProtocol.Request;
+import static org.apache.geode.protocol.protobuf.ClientProtocol.Response;
+import static org.apache.geode.protocol.protobuf.RegionAPI.PutRequest;
+
+public class ProtobufProtocolMessageHandler implements ClientProtocolMessageHandler {
+
+  private static final Logger logger = LogService.getLogger();
+
+  private String ErrorMessageFromMessage(Message message) {
+    return "Error parsing message, message string: " + message.toString();
+  }
+
+  @Override
+  public void receiveMessage(InputStream inputStream, OutputStream outputStream,
+      Deserializer deserializer, Cache cache) throws IOException {
+    final Message message = Message.parseDelimitedFrom(inputStream);
+    // can be null at EOF, see Parser.parseDelimitedFrom(java.io.InputStream)
+    if (message == null) {
+      return;
+    }
+
+    if (message.getMessageTypeCase() != Message.MessageTypeCase.REQUEST) {
+      // TODO
+      logger.error(() -> "Got message of type response: " + ErrorMessageFromMessage(message));
+    }
+
+    Request request = message.getRequest();
+    Message putResponseMessage = doPutRequest(request.getPutRequest(), deserializer, cache);
+
+    putResponseMessage.writeDelimitedTo(outputStream);
+  }
+
+  private Message doPutRequest(PutRequest request, Deserializer dataDeserializer, Cache cache) {
+    logger.error("Doing put request.");
+    final String regionName = request.getRegionName();
+    final BasicTypes.Entry entry = request.getEntry();
+    final ByteString key = entry.getKey().getKey();
+    final ByteString value = entry.getValue().getValue();
+
+    final Region<Object, Object> region = cache.getRegion(regionName);
+    try {
+      region.put(dataDeserializer.deserialize(key.toByteArray()),
+          dataDeserializer.deserialize(value.toByteArray()));
+      return putResponseWithStatus(true);
+    } catch (TimeoutException | CacheWriterException ex) {
+      logger.error("Caught normal-ish exception doing region put", ex);
+      return putResponseWithStatus(false);
+    }
+  }
+
+  private Message putResponseWithStatus(boolean ok) {
+    return Message.newBuilder()
+        .setResponse(Response.newBuilder().setPutResponse(PutResponse.newBuilder().setSuccess(ok)))
+        .build();
+  }
+
+  private GetResponse doGetRequest(GetRequest request, Deserializer deserializer, Cache cache) {
+    // TODO
+    return null;
+  }
+
+  public ProtobufProtocolMessageHandler() {}
+}
+
+
+// public final class NewClientProtocol {
+// public static void recvMessage(Cache cache, InputStream inputStream, OutputStream outputStream) {
+// try {
+// final DataInputStream dataInputStream = new DataInputStream(inputStream);
+// final DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
+//
+// RequestHeader header = new RequestHeader(dataInputStream);
+//
+// // todo: string -- assume UTF-8 from here.
+// // Java modified UTF-8: unsigned short len. hope we don't run into the "modified" part.
+// switch (header.requestType) {
+// case MessageType.PUT:
+// servePutRequest(header, cache, dataInputStream, dataOutputStream);
+// break;
+// case MessageType.REQUEST: // this is a GET request.
+// serveGetRequest(cache, dataInputStream, dataOutputStream);
+// break;
+// }
+// } catch (IOException e) {
+// e.printStackTrace();
+// // todo error handling.
+// }
+// }
+//
+// private static void serveGetRequest(Cache cache, DataInputStream dataInputStream,
+// DataOutputStream dataOutputStream) throws IOException {
+// // GetRequest: Header RegionName Key CallbackArg
+// final String regionName = readString(dataInputStream);
+// final String key = readString(dataInputStream);
+// // todo no callback arg for now
+// final Region<Object, Object> region = cache.getRegion(regionName);
+// // todo anything more complex?
+//
+// Object val = region.get(key);
+// if (val == null) {
+// byte[] bytes = "Entry not found in region.".getBytes();
+// dataOutputStream.writeInt(bytes.length); // len
+// dataOutputStream.writeByte(19); // ENTRY_NOT_FOUND_EXCEPTION
+// dataOutputStream.write(bytes);
+// } else {
+// byte[] bytes = val.toString().getBytes();
+// dataOutputStream.writeInt(bytes.length); // len
+// dataOutputStream.writeByte(1); // RESPONSE
+// dataOutputStream.write(bytes);
+// }
+// dataOutputStream.flush();
+// }
+//
+// // response: size responseType requestId
+//
+// private static void servePutRequest(RequestHeader header, Cache cache,
+// DataInputStream dataInputStream, DataOutputStream dataOutputStream) throws IOException {
+// String regionName = readString(dataInputStream);
+// // assume every object is a string.
+//
+// String key = readString(dataInputStream);
+// // TODO: value header, callback arg?
+// String value = readString(dataInputStream);
+//
+// Region<Object, Object> region = cache.getRegion(regionName);
+// region.put(key, value);
+//
+// dataOutputStream.writeInt(0); // len
+// dataOutputStream.writeByte(1); // RESPONSE
+// dataOutputStream.writeInt(header.requestId);
+// dataOutputStream.flush();
+// }
+//
+// private static String readString(DataInputStream inputStream) throws IOException {
+// String s = inputStream.readUTF();
+// return s;
+// }
+// }

http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-client-protobuf/src/main/proto/basicTypes.proto
----------------------------------------------------------------------
diff --git a/geode-client-protobuf/src/main/proto/basicTypes.proto b/geode-client-protobuf/src/main/proto/basicTypes.proto
new file mode 100644
index 0000000..7cb4204
--- /dev/null
+++ b/geode-client-protobuf/src/main/proto/basicTypes.proto
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+syntax = "proto3";
+package org.apache.geode.protocol.protobuf;
+
+import "google/protobuf/any.proto";
+
+message Entry{
+    Key key = 1;
+    Value value = 2;
+}
+
+message Key {
+    bytes key = 1;
+}
+
+message Value {
+    ValueHeader valueHeader = 1;
+    bytes value = 2;
+}
+
+message ValueHeader {
+    int32 size = 1;
+    bool isPartial = 2;
+}
+
+message CallbackArguments{
+    oneof callbackArgs {
+        bool hasCallbackArgument = 1;
+        bytes callbackBytes = 2;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-client-protobuf/src/main/proto/clientProtocol.proto
----------------------------------------------------------------------
diff --git a/geode-client-protobuf/src/main/proto/clientProtocol.proto b/geode-client-protobuf/src/main/proto/clientProtocol.proto
new file mode 100644
index 0000000..7e1e7bb
--- /dev/null
+++ b/geode-client-protobuf/src/main/proto/clientProtocol.proto
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+syntax = "proto3";
+package org.apache.geode.protocol.protobuf;
+
+import "google/protobuf/any.proto";
+import "region_API.proto";
+
+message Message {
+    MessageHeader messageHeader = 1;
+    oneof messageType {
+        Request request = 2;
+        Response response = 3;
+    }
+}
+
+message MessageHeader {
+    int32 correlationId = 2;
+    MetaData metadata = 1;
+}
+
+message Request {
+    oneof requestAPI {
+        PutRequest putRequest = 1;
+        GetRequest getRequest = 2;
+        PutAllRequest putAllRequest = 3;
+        GetAllRequest getAllRequest = 4;
+    }
+}
+
+message Response {
+    ResponseHeader responseHeader = 1;
+    oneof responseAPI {
+        PutResponse putResponse = 2;
+        GetResponse getResponse = 3;
+        PutAllResponse putAllResponse = 4;
+        GetAllResponse getAllResponse = 5;
+    }
+}
+
+message ResponseHeader {
+    oneof reponseType {
+        int32 responseTypeID = 1;
+        int32 errorCode = 2;
+    }
+}
+
+message MetaData {
+    int32 numberOfMetadata = 1;
+    map<int32, google.protobuf.Any> metaDataEntries = 2;
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-client-protobuf/src/main/proto/region_API.proto
----------------------------------------------------------------------
diff --git a/geode-client-protobuf/src/main/proto/region_API.proto b/geode-client-protobuf/src/main/proto/region_API.proto
new file mode 100644
index 0000000..d46e0e6
--- /dev/null
+++ b/geode-client-protobuf/src/main/proto/region_API.proto
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+syntax = "proto3";
+package org.apache.geode.protocol.protobuf;
+
+import "basicTypes.proto";
+
+message PutRequest {
+    string regionName = 1;
+    Entry entry = 2;
+    CallbackArguments callbackArg = 3;
+}
+
+message PutResponse {
+    bool success = 1;
+}
+
+message GetRequest {
+    string regionName = 1;
+    Key key = 2;
+    CallbackArguments callbackArg = 3;
+}
+
+message GetResponse {
+    Value result = 1;
+}
+
+message PutAllRequest {
+    string regionName = 1;
+    int32 numberOfEntries = 2;
+    repeated Entry entry = 3;
+    CallbackArguments callbackArg = 4;
+}
+
+message PutAllResponse {
+    int32 numberOfKeysFailed = 1;
+    repeated Key key = 2;
+}
+
+message GetAllRequest {
+    string regionName = 1;
+    int32 numberOfKeys = 2;
+    repeated Key key = 3;
+    CallbackArguments callbackArg = 4;
+}
+
+message GetAllResponse {
+    int32 numberOfEntries = 1;
+    repeated Entry entries = 2;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-client-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler
----------------------------------------------------------------------
diff --git a/geode-client-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler b/geode-client-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler
new file mode 100644
index 0000000..2afaf73
--- /dev/null
+++ b/geode-client-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler
@@ -0,0 +1 @@
+org.apache.geode.protocol.client.ProtobufProtocolMessageHandler

http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/MessageUtils.java
----------------------------------------------------------------------
diff --git a/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/MessageUtils.java b/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/MessageUtils.java
new file mode 100644
index 0000000..02e2a58
--- /dev/null
+++ b/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/MessageUtils.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.protocol.client;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import org.apache.geode.protocol.protobuf.BasicTypes;
+import org.apache.geode.protocol.protobuf.ClientProtocol;
+import org.apache.geode.protocol.protobuf.RegionAPI;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Random;
+
+public class MessageUtils {
+  public static ByteArrayInputStream loadMessageIntoInputStream(ClientProtocol.Message message)
+      throws IOException {
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    message.writeDelimitedTo(byteArrayOutputStream);
+    byte[] messageByteArray = byteArrayOutputStream.toByteArray();
+    return new ByteArrayInputStream(messageByteArray);
+  }
+
+  public static ClientProtocol.Message makePutMessage() {
+    Random random = new Random();
+    ClientProtocol.MessageHeader.Builder messageHeader =
+        ClientProtocol.MessageHeader.newBuilder().setCorrelationId(random.nextInt());
+
+    BasicTypes.Key.Builder key =
+        BasicTypes.Key.newBuilder().setKey(ByteString.copyFrom(createByteArrayOfSize(64)));
+
+    BasicTypes.Value.Builder value =
+        BasicTypes.Value.newBuilder().setValue(ByteString.copyFrom(createByteArrayOfSize(512)));
+
+    RegionAPI.PutRequest.Builder putRequestBuilder =
+        RegionAPI.PutRequest.newBuilder().setRegionName("TestRegion")
+            .setEntry(BasicTypes.Entry.newBuilder().setKey(key).setValue(value));
+
+    ClientProtocol.Request.Builder request =
+        ClientProtocol.Request.newBuilder().setPutRequest(putRequestBuilder);
+
+    ClientProtocol.Message.Builder message =
+        ClientProtocol.Message.newBuilder().setMessageHeader(messageHeader).setRequest(request);
+
+    return message.build();
+  }
+
+  public static ClientProtocol.Message makePutMessageFor(String region, String key, String value) {
+    Random random = new Random();
+    ClientProtocol.MessageHeader.Builder messageHeader =
+        ClientProtocol.MessageHeader.newBuilder().setCorrelationId(random.nextInt());
+
+    BasicTypes.Key.Builder keyBuilder =
+        BasicTypes.Key.newBuilder().setKey(ByteString.copyFromUtf8(key));
+
+    BasicTypes.Value.Builder valueBuilder =
+        BasicTypes.Value.newBuilder().setValue(ByteString.copyFromUtf8(value));
+
+    RegionAPI.PutRequest.Builder putRequestBuilder =
+        RegionAPI.PutRequest.newBuilder().setRegionName(region)
+            .setEntry(BasicTypes.Entry.newBuilder().setKey(keyBuilder).setValue(valueBuilder));
+
+    ClientProtocol.Request.Builder request =
+        ClientProtocol.Request.newBuilder().setPutRequest(putRequestBuilder);
+    ClientProtocol.Message.Builder message =
+        ClientProtocol.Message.newBuilder().setMessageHeader(messageHeader).setRequest(request);
+
+    return message.build();
+  }
+
+  private static byte[] createByteArrayOfSize(int msgSize) {
+    byte[] array = new byte[msgSize];
+    for (int i = 0; i < msgSize; i++) {
+      array[i] = 'a';
+    }
+    return array;
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufProtocolIntegrationTest.java
----------------------------------------------------------------------
diff --git a/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufProtocolIntegrationTest.java b/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufProtocolIntegrationTest.java
new file mode 100644
index 0000000..5d92cf0
--- /dev/null
+++ b/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufProtocolIntegrationTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.protocol.client;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.protocol.protobuf.ClientProtocol;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.*;
+
+@Category(IntegrationTest.class)
+public class ProtobufProtocolIntegrationTest {
+  @Test
+  public void testRoundTripClientCommunicationWorks() throws IOException {
+    try (Cache cache = createCacheOnPort(40404);
+        NewClientProtocolTestClient client = new NewClientProtocolTestClient("localhost", 40404)) {
+      final String testRegion = "testRegion";
+      final String testKey = "testKey";
+      final String testValue = "testValue";
+      Region<Object, Object> region = cache.createRegionFactory().create("testRegion");
+
+      ClientProtocol.Message message =
+          MessageUtils.makePutMessageFor(testRegion, testKey, testValue);
+      ClientProtocol.Message response = client.blockingSendMessage(message);
+      client.parseResponse(response);
+
+      assertEquals(response.getMessageTypeCase(), ClientProtocol.Message.MessageTypeCase.RESPONSE);
+      assertEquals(response.getResponse().getResponseAPICase(),
+          ClientProtocol.Response.ResponseAPICase.PUTRESPONSE);
+      assertTrue(response.getResponse().getPutResponse().getSuccess());
+
+      assertEquals(1, region.size());
+      assertTrue(region.containsKey(testKey));
+      assertEquals(testValue, region.get(testKey));
+    }
+  }
+
+  @Test
+  public void startCache() throws IOException {
+    try (Cache cache = createCacheOnPort(40404)) {
+      while (true) {
+        try {
+          Thread.sleep(100000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+  }
+
+  private Cache createCacheOnPort(int port) throws IOException {
+    Properties props = new Properties();
+    props.setProperty(ConfigurationProperties.TCP_PORT, Integer.toString(port));
+    props.setProperty(ConfigurationProperties.BIND_ADDRESS, "localhost");
+    CacheFactory cf = new CacheFactory(props);
+    Cache cache = cf.create();
+    CacheServer cacheServer = cache.addCacheServer();
+    cacheServer.setBindAddress("localhost");
+    cacheServer.start();
+    return cache;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufSerializationDeserializationTest.java
----------------------------------------------------------------------
diff --git a/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufSerializationDeserializationTest.java b/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufSerializationDeserializationTest.java
new file mode 100644
index 0000000..8cf36ca
--- /dev/null
+++ b/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufSerializationDeserializationTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.protocol.client;
+
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Region;
+import org.apache.geode.protocol.protobuf.ClientProtocol;
+import org.apache.geode.serialization.Deserializer;
+import org.apache.geode.serialization.SerializationType;
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import static org.apache.geode.protocol.client.MessageUtils.makePutMessage;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+
+@Category(UnitTest.class)
+public class ProtobufSerializationDeserializationTest {
+  @Test
+  public void protobufSerializationSmokeTest() throws InvalidProtocolBufferException {
+    ClientProtocol.Message message = makePutMessage();
+    byte[] bytes = message.toByteArray();
+    ClientProtocol.Message message1 = ClientProtocol.Message.parseFrom(bytes);
+    assertEquals(message, message1);
+  }
+
+  /**
+   * Given a serialized message that we've built, verify that the server part does the right call to
+   * the Cache it gets passed.
+   */
+  @Test
+  public void testNewClientProtocolPutsOnPutMessage() throws IOException {
+    final String testRegion = "testRegion";
+    final String testKey = "testKey";
+    final String testValue = "testValue";
+    ClientProtocol.Message message = MessageUtils.makePutMessageFor(testRegion, testKey, testValue);
+
+    Deserializer deserializer = SerializationType.BYTE_BLOB.deserializer;
+    Cache mockCache = Mockito.mock(Cache.class);
+    Region mockRegion = Mockito.mock(Region.class);
+    when(mockCache.getRegion("testRegion")).thenReturn(mockRegion);
+    OutputStream mockOutputStream = Mockito.mock(OutputStream.class);
+
+    ProtobufProtocolMessageHandler newClientProtocol = new ProtobufProtocolMessageHandler();
+    newClientProtocol.receiveMessage(MessageUtils.loadMessageIntoInputStream(message),
+        mockOutputStream, deserializer, mockCache);
+
+    verify(mockRegion).put(testKey.getBytes(), testValue.getBytes());
+  }
+
+  @Test
+  public void testServerRespondsToPutMessage() throws IOException {
+    final String testRegion = "testRegion";
+    final String testKey = "testKey";
+    final String testValue = "testValue";
+    ClientProtocol.Message message = MessageUtils.makePutMessageFor(testRegion, testKey, testValue);
+
+    Deserializer deserializer = SerializationType.BYTE_BLOB.deserializer;
+    Cache mockCache = Mockito.mock(Cache.class);
+    Region mockRegion = Mockito.mock(Region.class);
+    when(mockCache.getRegion("testRegion")).thenReturn(mockRegion);
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream(128);
+
+    ProtobufProtocolMessageHandler newClientProtocol = new ProtobufProtocolMessageHandler();
+    newClientProtocol.receiveMessage(MessageUtils.loadMessageIntoInputStream(message), outputStream,
+        deserializer, mockCache);
+
+    ClientProtocol.Message responseMessage = ClientProtocol.Message
+        .parseDelimitedFrom(new ByteArrayInputStream(outputStream.toByteArray()));
+
+    assertEquals(responseMessage.getMessageTypeCase(),
+        ClientProtocol.Message.MessageTypeCase.RESPONSE);
+    assertEquals(responseMessage.getResponse().getResponseAPICase(),
+        ClientProtocol.Response.ResponseAPICase.PUTRESPONSE);
+    assertTrue(responseMessage.getResponse().getPutResponse().getSuccess());
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java
index a419d57..fc9eab1 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java
@@ -116,7 +116,8 @@ public class ConnectionFactoryImpl implements ConnectionFactory {
     } else if (forQueue) {
       return Acceptor.CLIENT_TO_SERVER_FOR_QUEUE;
     } else {
-      return Acceptor.CLIENT_TO_SERVER;
+      // return Acceptor.CLIENT_TO_SERVER;
+      return Acceptor.CLIENT_TO_SERVER_NEW_PROTOCOL;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java
index f71b79b..35bbcef 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java
@@ -17,6 +17,7 @@ package org.apache.geode.cache.client.internal;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketException;
 import java.nio.ByteBuffer;
@@ -31,16 +32,19 @@ import org.apache.geode.cache.client.internal.ExecuteFunctionOp.ExecuteFunctionO
 import org.apache.geode.cache.client.internal.ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl;
 import org.apache.geode.cache.client.internal.ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl;
 import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.net.SocketCreator;
+import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
 import org.apache.geode.internal.cache.tier.sockets.HandShake;
 import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
 import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
-import org.apache.geode.internal.net.SocketCreator;
 
 /**
  * A single client to server connection.
@@ -114,7 +118,16 @@ public class ConnectionImpl implements Connection {
     theSocket.setSoTimeout(handShakeTimeout);
     out = theSocket.getOutputStream();
     in = theSocket.getInputStream();
-    this.status = handShake.handshakeWithServer(this, location, communicationMode);
+
+    if (communicationMode == AcceptorImpl.CLIENT_TO_SERVER_NEW_PROTOCOL) {
+      handShake.writeNewProtcolVersionForServer(this, communicationMode);
+      InetSocketAddress remoteAddr = (InetSocketAddress) theSocket.getRemoteSocketAddress();
+      DistributedMember distributedMember =
+          new InternalDistributedMember(remoteAddr.getAddress(), remoteAddr.getPort());
+      this.status = new ServerQueueStatus(distributedMember);
+    } else {
+      this.status = handShake.handshakeWithServer(this, location, communicationMode);
+    }
     commBuffer = ServerConnection.allocateCommBuffer(socketBufferSize, theSocket);
     if (sender != null) {
       commBufferForAsyncRead = ServerConnection.allocateCommBuffer(socketBufferSize, theSocket);

http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
index 9a3241b..26df0e2 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
@@ -71,6 +71,11 @@ public abstract class Acceptor {
    */
   public static final byte CLIENT_TO_SERVER_FOR_QUEUE = (byte) 107;
 
+  /**
+   * For new client-server protocol which ignores current handshake mechanism
+   */
+  public static final byte CLIENT_TO_SERVER_NEW_PROTOCOL = (byte) 110;
+
 
   /**
    * The GFE version of the server.

http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index 9658f98..33596ff 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -1424,8 +1424,8 @@ public class AcceptorImpl extends Acceptor implements Runnable {
     s.setTcpNoDelay(this.tcpNoDelay);
 
     if (communicationMode == CLIENT_TO_SERVER || communicationMode == GATEWAY_TO_GATEWAY
-        || communicationMode == MONITOR_TO_SERVER
-        || communicationMode == CLIENT_TO_SERVER_FOR_QUEUE) {
+        || communicationMode == MONITOR_TO_SERVER || communicationMode == CLIENT_TO_SERVER_FOR_QUEUE
+        || communicationMode == CLIENT_TO_SERVER_NEW_PROTOCOL) {
       String communicationModeStr = "";
       switch (communicationMode) {
         case CLIENT_TO_SERVER:
@@ -1440,6 +1440,9 @@ public class AcceptorImpl extends Acceptor implements Runnable {
         case CLIENT_TO_SERVER_FOR_QUEUE:
           communicationModeStr = "clientToServerForQueue";
           break;
+        case CLIENT_TO_SERVER_NEW_PROTOCOL:
+          communicationModeStr = "clientToServerForNewProtocol";
+          break;
       }
       if (logger.isDebugEnabled()) {
         logger.debug("Bridge server: Initializing {} communication socket: {}",

http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
new file mode 100644
index 0000000..aa6d4cb
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache.tier.sockets;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.serialization.Deserializer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public interface ClientProtocolMessageHandler {
+  void receiveMessage(InputStream inputStream, OutputStream outputStream, Deserializer serializer,
+      Cache cache) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
index 388f838..77a3be9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
@@ -337,6 +337,14 @@ public class HandShake implements ClientHandShake {
     this.credentials = null;
   }
 
+  public HandShake(ClientProxyMembershipID id, DistributedSystem sys, Version v) {
+    this.id = id;
+    this.code = REPLY_OK;
+    this.system = sys;
+    this.credentials = null;
+    this.clientVersion = v;
+  }
+
   public void updateProxyID(InternalDistributedMember idm) {
     this.id.updateID(idm);
   }
@@ -1177,6 +1185,19 @@ public class HandShake implements ClientHandShake {
     return new InternalDistributedMember(sock.getInetAddress(), sock.getPort(), false);
   }
 
+  public void writeNewProtcolVersionForServer(Connection conn, byte communicationMode)
+      throws IOException {
+    Socket sock = conn.getSocket();
+    try {
+      DataOutputStream dos = new DataOutputStream(sock.getOutputStream());
+      dos.writeByte(communicationMode);
+    } catch (IOException ex) {
+      CancelCriterion stopper = this.system.getCancelCriterion();
+      stopper.checkCancelInProgress(null);
+      throw ex;
+    }
+  }
+
   /**
    * Client-side handshake with a Server
    */

http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 83d0e9d..e58e213 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -19,19 +19,25 @@ import static org.apache.geode.distributed.ConfigurationProperties.*;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.security.Principal;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
+import java.util.ServiceLoader;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.geode.serialization.SerializationType;
 import org.apache.logging.log4j.Logger;
 import org.apache.shiro.subject.Subject;
 import org.apache.shiro.util.ThreadState;
@@ -42,6 +48,7 @@ import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.client.internal.AbstractOp;
 import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
@@ -147,6 +154,9 @@ public class ServerConnection implements Runnable {
   private final CachedRegionHelper crHelper;
   private String name = null;
 
+  // The new protocol lives in a separate module and gets loaded when this class is instantiated.
+  private static ClientProtocolMessageHandler newClientProtocol;
+
   // IMPORTANT: if new messages are added change setHandshake to initialize them
   // to the correct Version for serializing to the client
   private Message requestMsg = new Message(2, Version.CURRENT);
@@ -180,7 +190,9 @@ public class ServerConnection implements Runnable {
    */
   private volatile int requestSpecificTimeout = -1;
 
-  /** Tracks the id of the most recent batch to which a reply has been sent */
+  /**
+   * Tracks the id of the most recent batch to which a reply has been sent
+   */
   private int latestBatchIdReplied = -1;
 
   /*
@@ -276,6 +288,21 @@ public class ServerConnection implements Runnable {
     this.postAuthzRequest = null;
     this.randomConnectionIdGen = new Random(this.hashCode());
 
+    if (newClientProtocol == null) {
+      Iterator<ClientProtocolMessageHandler> protocolIterator =
+          ServiceLoader.load(ClientProtocolMessageHandler.class).iterator();
+      if (protocolIterator.hasNext()) {
+        newClientProtocol = protocolIterator.next();
+      } else {
+        logger.warn("Implementation not found in the JVM for ClientProtocolMessageHandler");
+      }
+      // TODO handle multiple ClientProtocolMessageHandler impls.
+      if (protocolIterator.hasNext()) {
+        logger.warn(
+            "Multiple implementations found in the JVM for ClientProtocolMessageHandler; using the first one available.");
+      }
+    }
+
     final boolean isDebugEnabled = logger.isDebugEnabled();
     try {
       // requestMsg.setUseDataStream(useDataStream);
@@ -319,11 +346,25 @@ public class ServerConnection implements Runnable {
     return executeFunctionOnLocalNodeOnly.get();
   }
 
+  private boolean createClientHandshake() {
+    logger.info("createClientHandshake this.getCommunicationMode() " + this.getCommunicationMode());
+    if (this.getCommunicationMode() != AcceptorImpl.CLIENT_TO_SERVER_NEW_PROTOCOL) {
+      return ServerHandShakeProcessor.readHandShake(this);
+    } else {
+      InetSocketAddress remoteAddress = (InetSocketAddress) theSocket.getRemoteSocketAddress();
+      DistributedMember member =
+          new InternalDistributedMember(remoteAddress.getAddress(), remoteAddress.getPort());
+      this.proxyId = new ClientProxyMembershipID(member);
+      this.handshake = new HandShake(this.proxyId, this.getDistributedSystem(), Version.CURRENT);
+      return true;
+    }
+  }
+
   private boolean verifyClientConnection() {
     synchronized (this.handShakeMonitor) {
       if (this.handshake == null) {
         // synchronized (getCleanupTable()) {
-        boolean readHandShake = ServerHandShakeProcessor.readHandShake(this);
+        boolean readHandShake = createClientHandshake();
         if (readHandShake) {
           if (this.handshake.isOK()) {
             try {
@@ -593,8 +634,10 @@ public class ServerConnection implements Runnable {
 
   private boolean acceptHandShake(byte epType, int qSize) {
     try {
-      this.handshake.accept(theSocket.getOutputStream(), theSocket.getInputStream(), epType, qSize,
-          this.communicationMode, this.principal);
+      if (this.communicationMode != AcceptorImpl.CLIENT_TO_SERVER_NEW_PROTOCOL) {
+        this.handshake.accept(theSocket.getOutputStream(), theSocket.getInputStream(), epType,
+            qSize, this.communicationMode, this.principal);
+      }
     } catch (IOException ioe) {
       if (!crHelper.isShutdown() && !isTerminated()) {
         logger.warn(LocalizedMessage.create(
@@ -905,6 +948,22 @@ public class ServerConnection implements Runnable {
   }
 
   private void doOneMessage() {
+    boolean useNewClientProtocol =
+        this.communicationMode == AcceptorImpl.CLIENT_TO_SERVER_NEW_PROTOCOL;
+    if (useNewClientProtocol) {
+      try {
+        Socket socket = this.getSocket();
+        InputStream inputStream = socket.getInputStream();
+        OutputStream outputStream = socket.getOutputStream();
+        // TODO serialization types?
+        newClientProtocol.receiveMessage(inputStream, outputStream,
+            SerializationType.STRING.deserializer, this.getCache());
+      } catch (IOException e) {
+        // TODO?
+      }
+      return;
+    }
+
     if (this.doHandshake) {
       doHandshake();
       this.doHandshake = false;
@@ -914,6 +973,7 @@ public class ServerConnection implements Runnable {
     }
   }
 
+
   private void initializeClientUserAuths() {
     this.clientUserAuths = getClientUserAuths(this.proxyId);
   }
@@ -1070,7 +1130,7 @@ public class ServerConnection implements Runnable {
   /**
    * MessageType of the messages (typically internal commands) which do not need to participate in
    * security should be added in the following if block.
-   * 
+   *
    * @return Part
    * @see AbstractOp#processSecureBytes(Connection, Message)
    * @see AbstractOp#needsUserId()
@@ -1191,6 +1251,7 @@ public class ServerConnection implements Runnable {
    * If registered with a selector then this will be the key we are registered with.
    */
   // private SelectionKey sKey = null;
+
   /**
    * Register this connection with the given selector for read events. Note that switch the channel
    * to non-blocking so it can be in a selector.
@@ -1206,7 +1267,8 @@ public class ServerConnection implements Runnable {
   }
 
   public void registerWithSelector2(Selector s) throws IOException {
-    /* this.sKey = */getSelectableChannel().register(s, SelectionKey.OP_READ, this);
+    /* this.sKey = */
+    getSelectableChannel().register(s, SelectionKey.OP_READ, this);
   }
 
   /**
@@ -1229,7 +1291,6 @@ public class ServerConnection implements Runnable {
   }
 
   /**
-   *
    * @return String representing the DistributedSystemMembership of the Client VM
    */
   public String getMembershipID() {
@@ -1497,7 +1558,7 @@ public class ServerConnection implements Runnable {
 
   /**
    * Just ensure that this class gets loaded.
-   * 
+   *
    * @see SystemFailure#loadEmergencyClasses()
    */
   public static void loadEmergencyClasses() {
@@ -1524,7 +1585,9 @@ public class ServerConnection implements Runnable {
     return this.name;
   }
 
-  /** returns the name of this connection */
+  /**
+   * returns the name of this connection
+   */
   public String getName() {
     return this.name;
   }
@@ -1808,7 +1871,9 @@ public class ServerConnection implements Runnable {
     return postAuthReq;
   }
 
-  /** returns the member ID byte array to be used for creating EventID objects */
+  /**
+   * returns the member ID byte array to be used for creating EventID objects
+   */
   public byte[] getEventMemberIDByteArray() {
     return this.memberIdByteArray;
   }

http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-core/src/main/java/org/apache/geode/serialization/Deserializer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/serialization/Deserializer.java b/geode-core/src/main/java/org/apache/geode/serialization/Deserializer.java
new file mode 100644
index 0000000..e4cd20c
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/serialization/Deserializer.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.serialization;
+
+public interface Deserializer<T> {
+  public abstract T deserialize(byte[] bytes);
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-core/src/main/java/org/apache/geode/serialization/FunctionalSerializer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/serialization/FunctionalSerializer.java b/geode-core/src/main/java/org/apache/geode/serialization/FunctionalSerializer.java
new file mode 100644
index 0000000..a3259db
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/serialization/FunctionalSerializer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.serialization;
+
+public class FunctionalSerializer<T> implements Serializer<T>, Deserializer<T> {
+  public final Serializer<T> serializer;
+  public final Deserializer<T> deserializer;
+
+  public FunctionalSerializer(Serializer<T> serializer, Deserializer<T> deserializer) {
+    this.serializer = serializer;
+    this.deserializer = deserializer;
+  }
+
+  @Override
+  public T deserialize(byte[] bytes) {
+    return deserializer.deserialize(bytes);
+  }
+
+  @Override
+  public byte[] serialize(T item) {
+    return serializer.serialize(item);
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-core/src/main/java/org/apache/geode/serialization/SerializationType.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/serialization/SerializationType.java b/geode-core/src/main/java/org/apache/geode/serialization/SerializationType.java
new file mode 100644
index 0000000..2ea4066
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/serialization/SerializationType.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.serialization;
+
+import org.apache.geode.pdx.JSONFormatter;
+import org.apache.geode.pdx.PdxInstance;
+
+public enum SerializationType {
+  STRING(String.class, String::getBytes, String::new),
+  BYTE_BLOB(byte[].class, (x) -> x, (x) -> x),
+  JSON(PdxInstance.class, JSONFormatter::toJSONByteArray, JSONFormatter::fromJSON);
+
+  public final Class klass;
+  public final Serializer serializer;
+  public final Deserializer deserializer;
+
+  <T> SerializationType(Class<T> klass, Serializer<T> serializer, Deserializer<T> deserializer) {
+    this.klass = klass;
+    this.serializer = serializer;
+    this.deserializer = deserializer;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-core/src/main/java/org/apache/geode/serialization/Serializer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/serialization/Serializer.java b/geode-core/src/main/java/org/apache/geode/serialization/Serializer.java
new file mode 100644
index 0000000..6be877f
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/serialization/Serializer.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.serialization;
+
+public interface Serializer<T> {
+  public abstract byte[] serialize(T item);
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-core/src/test/java/org/apache/geode/pdx/PdxClientServerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/pdx/PdxClientServerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/pdx/PdxClientServerDUnitTest.java
index 6fcdf4c..0272285 100644
--- a/geode-core/src/test/java/org/apache/geode/pdx/PdxClientServerDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/pdx/PdxClientServerDUnitTest.java
@@ -593,7 +593,10 @@ public class PdxClientServerDUnitTest extends JUnit4CacheTestCase {
   }
 
   private int createServerRegion(final Class constraintClass) throws IOException {
-    CacheFactory cf = new CacheFactory(getDistributedSystemProperties());
+    Properties p = getDistributedSystemProperties();
+    p.put("log-level", "debug");
+    CacheFactory cf = new CacheFactory();
+
     Cache cache = getCache(cf);
     RegionFactory rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
     rf.setValueConstraint(constraintClass);

http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/gradle/rat.gradle
----------------------------------------------------------------------
diff --git a/gradle/rat.gradle b/gradle/rat.gradle
index f8018b6..7e1d61d 100644
--- a/gradle/rat.gradle
+++ b/gradle/rat.gradle
@@ -120,11 +120,7 @@ rat {
     'geode-core/src/test/resources/org/apache/geode/management/internal/configuration/domain/CacheElementJUnitTest.xml',
     'geode-core/src/test/resources/org/apache/geode/management/internal/configuration/utils/*.xml',
 
-    '**/META-INF/services/org.xml.sax.ext.EntityResolver2',
-    '**/META-INF/services/org.apache.geode.internal.cache.CacheService',
-    '**/META-INF/services/org.apache.geode.internal.cache.xmlcache.XmlParser',
-    '**/META-INF/services/org.apache.geode.distributed.ServerLauncherCacheProvider',
-    '**/META-INF/services/org.springframework.shell.core.CommandMarker',
+    '**/META-INF/services/*',
 
     // --- Other Licenses ---
 

http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index c0fdb6e..3efc2dc 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -16,6 +16,7 @@
  */
 rootProject.name = 'geode'
 
+include 'geode-client-protobuf'
 include 'geode-old-versions'
 include 'geode-common'
 include 'geode-json'


Mime
View raw message