Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1ECE0200C8C for ; Mon, 22 May 2017 20:29:10 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1DA46160BE1; Mon, 22 May 2017 18:29:10 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 310CF160BF4 for ; Mon, 22 May 2017 20:29:07 +0200 (CEST) Received: (qmail 65621 invoked by uid 500); 22 May 2017 18:29:02 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 62878 invoked by uid 99); 22 May 2017 18:28:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 May 2017 18:28:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5D245E1863; Mon, 22 May 2017 18:28:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: udo@apache.org To: commits@geode.apache.org Date: Mon, 22 May 2017 18:30:03 -0000 Message-Id: <4b44fce2e14e40a18a310bf27507caaa@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [67/69] [abbrv] geode git commit: GEODE-2582: New client can send a Put request with Protobuf IDL. archived-at: Mon, 22 May 2017 18:29:10 -0000 GEODE-2582: New client can send a Put request with Protobuf IDL. * Update the proto IDL * Get a put request working. Currently this only uses String serialization; serialization protocols still have to be implemented. Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/511e2a35 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/511e2a35 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/511e2a35 Branch: refs/heads/feature/GEODE-2580 Commit: 511e2a351d4be68b7741727abc34d5147584dff7 Parents: 64798a5 Author: Galen OSullivan Authored: Thu Mar 16 16:26:03 2017 -0700 Committer: Udo Kohlmeyer Committed: Mon May 22 11:27:03 2017 -0700 ---------------------------------------------------------------------- geode-assembly/build.gradle | 9 +- geode-client-protobuf/build.gradle | 88 ++++++++++ .../client/NewClientProtocolTestClient.java | 103 +++++++++++ .../client/ProtobufProtocolMessageHandler.java | 175 +++++++++++++++++++ .../src/main/proto/basicTypes.proto | 45 +++++ .../src/main/proto/clientProtocol.proto | 64 +++++++ .../src/main/proto/region_API.proto | 63 +++++++ ...he.tier.sockets.ClientProtocolMessageHandler | 1 + .../geode/protocol/client/MessageUtils.java | 92 ++++++++++ .../client/ProtobufProtocolIntegrationTest.java | 85 +++++++++ ...rotobufSerializationDeserializationTest.java | 101 +++++++++++ .../client/internal/ConnectionFactoryImpl.java | 3 +- .../cache/client/internal/ConnectionImpl.java | 17 +- .../geode/internal/cache/tier/Acceptor.java | 5 + .../cache/tier/sockets/AcceptorImpl.java | 7 +- .../sockets/ClientProtocolMessageHandler.java | 28 +++ .../internal/cache/tier/sockets/HandShake.java | 21 +++ .../cache/tier/sockets/ServerConnection.java | 85 +++++++-- .../geode/serialization/Deserializer.java | 20 +++ .../serialization/FunctionalSerializer.java | 36 ++++ .../geode/serialization/SerializationType.java | 36 ++++ .../apache/geode/serialization/Serializer.java | 20 +++ .../geode/pdx/PdxClientServerDUnitTest.java | 5 +- gradle/rat.gradle | 6 +- settings.gradle | 1 + 25 files changed, 1094 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-assembly/build.gradle ---------------------------------------------------------------------- diff --git a/geode-assembly/build.gradle b/geode-assembly/build.gradle index a4f0c69..6c626a0 100755 --- a/geode-assembly/build.gradle +++ b/geode-assembly/build.gradle @@ -61,6 +61,7 @@ dependencies { archives project(':geode-wan') archives project(':geode-cq') archives project(':geode-rebalancer') + archives project(':geode-client-protobuf') testCompile project(':geode-junit') testCompile project(':geode-pulse') @@ -176,7 +177,10 @@ def cp = { it.contains('lucene-analyzers-common') || it.contains('lucene-core') || it.contains('lucene-queries') || - it.contains('lucene-queryparser') + it.contains('lucene-queryparser') || + + // dependencies from geode-client-protobuf + it.contains('protobuf-java') } } }.flatten().unique().join(' ') @@ -335,6 +339,9 @@ distributions { from project(":geode-lucene").configurations.runtime from project(":geode-lucene").configurations.archives.allArtifacts.files + + from project(":geode-client-protobuf").configurations.runtime + from project(":geode-client-protobuf").configurations.archives.allArtifacts.files from project(":geode-old-client-support").configurations.runtime from project(":geode-old-client-support").configurations.archives.allArtifacts.files http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-client-protobuf/build.gradle ---------------------------------------------------------------------- diff --git a/geode-client-protobuf/build.gradle b/geode-client-protobuf/build.gradle new file mode 100644 index 0000000..c82f311 --- /dev/null +++ b/geode-client-protobuf/build.gradle @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +apply plugin: 'java' +apply plugin: 'com.google.protobuf' +apply plugin: 'idea' + +repositories { + maven { url "https://plugins.gradle.org/m2/" } +} + +buildscript { + repositories { + maven { url "https://plugins.gradle.org/m2/" } + } + dependencies { + classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.1' + } +} + +dependencies { + compile 'com.google.protobuf:protobuf-java:3.0.0' + //compile 'io.grpc:grpc-stub:1.0.0-pre2' + //compile 'io.grpc:grpc-protobuf:1.0.0-pre2' + // Extra proto source files besides the ones residing under + // "src/main". + protobuf files("main/proto/") + protobuf fileTree("ext/") + + provided project(':geode-core') + + testCompile 'junit:junit:4.12' + testCompile project (':geode-junit') + testCompile "org.mockito:mockito-core:2.+" + testCompile files(project(':geode-core').sourceSets.test.output) + // Extra proto source files for test besides the ones residing under + // "src/test". + //testProtobuf files("lib/protos-test.tar.gz") +} + +protobuf { + protoc { + // The artifact spec for the Protobuf Compiler + artifact = 'com.google.protobuf:protoc:3.0.0' + } + // this allows our spotless rule to skip this directory (hopefully rat too) + generatedFilesBaseDir = "$buildDir/generated-src/proto" + +// plugins { +// // Optional: an artifact spec for a protoc plugin, with "grpc" as +// // the identifier, which can be referred to in the "plugins" +// // container of the "generateProtoTasks" closure. +// grpc { +// artifact = 'io.grpc:protoc-gen-grpc-java:1.0.0-pre2' +// } +// } +// generateProtoTasks { +// ofSourceSet('main')*.plugins { +// // Apply the "grpc" plugin whose spec is defined above, without +// // options. Note the braces cannot be omitted, otherwise the +// // plugin will not be added. This is because of the implicit way +// // NamedDomainObjectContainer binds the methods. +// grpc { } +// } +// } +} + +// let IntelliJ know where the generated sources are. +idea { + module { + sourceDirs += file("${protobuf.generatedFilesBaseDir}/main/java") + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/NewClientProtocolTestClient.java ---------------------------------------------------------------------- diff --git a/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/NewClientProtocolTestClient.java b/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/NewClientProtocolTestClient.java new file mode 100644 index 0000000..a4476e1 --- /dev/null +++ b/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/NewClientProtocolTestClient.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.protocol.client; + +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl; +import org.apache.geode.protocol.protobuf.BasicTypes; +import org.apache.geode.protocol.protobuf.ClientProtocol; +import org.apache.geode.protocol.protobuf.ClientProtocol.Message; +import org.apache.geode.protocol.protobuf.RegionAPI; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.Random; + + +public class NewClientProtocolTestClient implements AutoCloseable { + private static SocketChannel socketChannel; + private final OutputStream outputStream; + private final InputStream inputStream; + + public NewClientProtocolTestClient(String hostname, int port) throws IOException { + socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port)); + inputStream = socketChannel.socket().getInputStream(); + outputStream = socketChannel.socket().getOutputStream(); + } + + @Override + public void close() throws IOException { + socketChannel.close(); + } + + private static void sendHeader(OutputStream outputStream) throws IOException { + outputStream.write(AcceptorImpl.CLIENT_TO_SERVER_NEW_PROTOCOL); + } + + public Message blockingSendMessage(Message message) throws IOException { + sendHeader(outputStream); + + message.writeDelimitedTo(outputStream); + outputStream.flush(); + + return ClientProtocol.Message.parseDelimitedFrom(inputStream); + } + + void parseResponse(Message response) { + System.out.println("response = " + response.toString()); + } + + private Message generateMessage() { + Random random = new Random(); + ClientProtocol.MessageHeader.Builder messageHeader = + ClientProtocol.MessageHeader.newBuilder().setCorrelationId(random.nextInt()); + // .setSize() //we don't need to set the size because Protobuf will handle the message frame + + BasicTypes.Key.Builder key = + BasicTypes.Key.newBuilder().setKey(ByteString.copyFrom(createByteArrayOfSize(64))); + + BasicTypes.Value.Builder value = + BasicTypes.Value.newBuilder().setValue(ByteString.copyFrom(createByteArrayOfSize(512))); + + RegionAPI.PutRequest.Builder putRequestBuilder = + RegionAPI.PutRequest.newBuilder().setRegionName("TestRegion") + .setEntry(BasicTypes.Entry.newBuilder().setKey(key).setValue(value)); + + ClientProtocol.Request.Builder request = + ClientProtocol.Request.newBuilder().setPutRequest(putRequestBuilder); + + Message.Builder message = + Message.newBuilder().setMessageHeader(messageHeader).setRequest(request); + + return message.build(); + } + + private static byte[] createByteArrayOfSize(int msgSize) { + byte[] array = new byte[msgSize]; + for (int i = 0; i < msgSize; i++) { + array[i] = 'a'; + } + return array; + } + +} http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/ProtobufProtocolMessageHandler.java ---------------------------------------------------------------------- diff --git a/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/ProtobufProtocolMessageHandler.java b/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/ProtobufProtocolMessageHandler.java new file mode 100644 index 0000000..a6993c4 --- /dev/null +++ b/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/ProtobufProtocolMessageHandler.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.protocol.client; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Parser; +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.CacheWriterException; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.TimeoutException; +import org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler; +import org.apache.geode.internal.logging.LogService; +import org.apache.geode.protocol.protobuf.BasicTypes; +import org.apache.geode.protocol.protobuf.RegionAPI.GetRequest; +import org.apache.geode.protocol.protobuf.RegionAPI.GetResponse; +import org.apache.geode.protocol.protobuf.RegionAPI.PutResponse; +import org.apache.geode.serialization.Deserializer; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import static org.apache.geode.protocol.protobuf.ClientProtocol.Message; +import static org.apache.geode.protocol.protobuf.ClientProtocol.Request; +import static org.apache.geode.protocol.protobuf.ClientProtocol.Response; +import static org.apache.geode.protocol.protobuf.RegionAPI.PutRequest; + +public class ProtobufProtocolMessageHandler implements ClientProtocolMessageHandler { + + private static final Logger logger = LogService.getLogger(); + + private String ErrorMessageFromMessage(Message message) { + return "Error parsing message, message string: " + message.toString(); + } + + @Override + public void receiveMessage(InputStream inputStream, OutputStream outputStream, + Deserializer deserializer, Cache cache) throws IOException { + final Message message = Message.parseDelimitedFrom(inputStream); + // can be null at EOF, see Parser.parseDelimitedFrom(java.io.InputStream) + if (message == null) { + return; + } + + if (message.getMessageTypeCase() != Message.MessageTypeCase.REQUEST) { + // TODO + logger.error(() -> "Got message of type response: " + ErrorMessageFromMessage(message)); + } + + Request request = message.getRequest(); + Message putResponseMessage = doPutRequest(request.getPutRequest(), deserializer, cache); + + putResponseMessage.writeDelimitedTo(outputStream); + } + + private Message doPutRequest(PutRequest request, Deserializer dataDeserializer, Cache cache) { + logger.error("Doing put request."); + final String regionName = request.getRegionName(); + final BasicTypes.Entry entry = request.getEntry(); + final ByteString key = entry.getKey().getKey(); + final ByteString value = entry.getValue().getValue(); + + final Region region = cache.getRegion(regionName); + try { + region.put(dataDeserializer.deserialize(key.toByteArray()), + dataDeserializer.deserialize(value.toByteArray())); + return putResponseWithStatus(true); + } catch (TimeoutException | CacheWriterException ex) { + logger.error("Caught normal-ish exception doing region put", ex); + return putResponseWithStatus(false); + } + } + + private Message putResponseWithStatus(boolean ok) { + return Message.newBuilder() + .setResponse(Response.newBuilder().setPutResponse(PutResponse.newBuilder().setSuccess(ok))) + .build(); + } + + private GetResponse doGetRequest(GetRequest request, Deserializer deserializer, Cache cache) { + // TODO + return null; + } + + public ProtobufProtocolMessageHandler() {} +} + + +// public final class NewClientProtocol { +// public static void recvMessage(Cache cache, InputStream inputStream, OutputStream outputStream) { +// try { +// final DataInputStream dataInputStream = new DataInputStream(inputStream); +// final DataOutputStream dataOutputStream = new DataOutputStream(outputStream); +// +// RequestHeader header = new RequestHeader(dataInputStream); +// +// // todo: string -- assume UTF-8 from here. +// // Java modified UTF-8: unsigned short len. hope we don't run into the "modified" part. +// switch (header.requestType) { +// case MessageType.PUT: +// servePutRequest(header, cache, dataInputStream, dataOutputStream); +// break; +// case MessageType.REQUEST: // this is a GET request. +// serveGetRequest(cache, dataInputStream, dataOutputStream); +// break; +// } +// } catch (IOException e) { +// e.printStackTrace(); +// // todo error handling. +// } +// } +// +// private static void serveGetRequest(Cache cache, DataInputStream dataInputStream, +// DataOutputStream dataOutputStream) throws IOException { +// // GetRequest: Header RegionName Key CallbackArg +// final String regionName = readString(dataInputStream); +// final String key = readString(dataInputStream); +// // todo no callback arg for now +// final Region region = cache.getRegion(regionName); +// // todo anything more complex? +// +// Object val = region.get(key); +// if (val == null) { +// byte[] bytes = "Entry not found in region.".getBytes(); +// dataOutputStream.writeInt(bytes.length); // len +// dataOutputStream.writeByte(19); // ENTRY_NOT_FOUND_EXCEPTION +// dataOutputStream.write(bytes); +// } else { +// byte[] bytes = val.toString().getBytes(); +// dataOutputStream.writeInt(bytes.length); // len +// dataOutputStream.writeByte(1); // RESPONSE +// dataOutputStream.write(bytes); +// } +// dataOutputStream.flush(); +// } +// +// // response: size responseType requestId +// +// private static void servePutRequest(RequestHeader header, Cache cache, +// DataInputStream dataInputStream, DataOutputStream dataOutputStream) throws IOException { +// String regionName = readString(dataInputStream); +// // assume every object is a string. +// +// String key = readString(dataInputStream); +// // TODO: value header, callback arg? +// String value = readString(dataInputStream); +// +// Region region = cache.getRegion(regionName); +// region.put(key, value); +// +// dataOutputStream.writeInt(0); // len +// dataOutputStream.writeByte(1); // RESPONSE +// dataOutputStream.writeInt(header.requestId); +// dataOutputStream.flush(); +// } +// +// private static String readString(DataInputStream inputStream) throws IOException { +// String s = inputStream.readUTF(); +// return s; +// } +// } http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-client-protobuf/src/main/proto/basicTypes.proto ---------------------------------------------------------------------- diff --git a/geode-client-protobuf/src/main/proto/basicTypes.proto b/geode-client-protobuf/src/main/proto/basicTypes.proto new file mode 100644 index 0000000..7cb4204 --- /dev/null +++ b/geode-client-protobuf/src/main/proto/basicTypes.proto @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +syntax = "proto3"; +package org.apache.geode.protocol.protobuf; + +import "google/protobuf/any.proto"; + +message Entry{ + Key key = 1; + Value value = 2; +} + +message Key { + bytes key = 1; +} + +message Value { + ValueHeader valueHeader = 1; + bytes value = 2; +} + +message ValueHeader { + int32 size = 1; + bool isPartial = 2; +} + +message CallbackArguments{ + oneof callbackArgs { + bool hasCallbackArgument = 1; + bytes callbackBytes = 2; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-client-protobuf/src/main/proto/clientProtocol.proto ---------------------------------------------------------------------- diff --git a/geode-client-protobuf/src/main/proto/clientProtocol.proto b/geode-client-protobuf/src/main/proto/clientProtocol.proto new file mode 100644 index 0000000..7e1e7bb --- /dev/null +++ b/geode-client-protobuf/src/main/proto/clientProtocol.proto @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +syntax = "proto3"; +package org.apache.geode.protocol.protobuf; + +import "google/protobuf/any.proto"; +import "region_API.proto"; + +message Message { + MessageHeader messageHeader = 1; + oneof messageType { + Request request = 2; + Response response = 3; + } +} + +message MessageHeader { + int32 correlationId = 2; + MetaData metadata = 1; +} + +message Request { + oneof requestAPI { + PutRequest putRequest = 1; + GetRequest getRequest = 2; + PutAllRequest putAllRequest = 3; + GetAllRequest getAllRequest = 4; + } +} + +message Response { + ResponseHeader responseHeader = 1; + oneof responseAPI { + PutResponse putResponse = 2; + GetResponse getResponse = 3; + PutAllResponse putAllResponse = 4; + GetAllResponse getAllResponse = 5; + } +} + +message ResponseHeader { + oneof reponseType { + int32 responseTypeID = 1; + int32 errorCode = 2; + } +} + +message MetaData { + int32 numberOfMetadata = 1; + map metaDataEntries = 2; +} http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-client-protobuf/src/main/proto/region_API.proto ---------------------------------------------------------------------- diff --git a/geode-client-protobuf/src/main/proto/region_API.proto b/geode-client-protobuf/src/main/proto/region_API.proto new file mode 100644 index 0000000..d46e0e6 --- /dev/null +++ b/geode-client-protobuf/src/main/proto/region_API.proto @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +syntax = "proto3"; +package org.apache.geode.protocol.protobuf; + +import "basicTypes.proto"; + +message PutRequest { + string regionName = 1; + Entry entry = 2; + CallbackArguments callbackArg = 3; +} + +message PutResponse { + bool success = 1; +} + +message GetRequest { + string regionName = 1; + Key key = 2; + CallbackArguments callbackArg = 3; +} + +message GetResponse { + Value result = 1; +} + +message PutAllRequest { + string regionName = 1; + int32 numberOfEntries = 2; + repeated Entry entry = 3; + CallbackArguments callbackArg = 4; +} + +message PutAllResponse { + int32 numberOfKeysFailed = 1; + repeated Key key = 2; +} + +message GetAllRequest { + string regionName = 1; + int32 numberOfKeys = 2; + repeated Key key = 3; + CallbackArguments callbackArg = 4; +} + +message GetAllResponse { + int32 numberOfEntries = 1; + repeated Entry entries = 2; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-client-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler ---------------------------------------------------------------------- diff --git a/geode-client-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler b/geode-client-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler new file mode 100644 index 0000000..2afaf73 --- /dev/null +++ b/geode-client-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler @@ -0,0 +1 @@ +org.apache.geode.protocol.client.ProtobufProtocolMessageHandler http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/MessageUtils.java ---------------------------------------------------------------------- diff --git a/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/MessageUtils.java b/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/MessageUtils.java new file mode 100644 index 0000000..02e2a58 --- /dev/null +++ b/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/MessageUtils.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.protocol.client; + +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import org.apache.geode.protocol.protobuf.BasicTypes; +import org.apache.geode.protocol.protobuf.ClientProtocol; +import org.apache.geode.protocol.protobuf.RegionAPI; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Random; + +public class MessageUtils { + public static ByteArrayInputStream loadMessageIntoInputStream(ClientProtocol.Message message) + throws IOException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + message.writeDelimitedTo(byteArrayOutputStream); + byte[] messageByteArray = byteArrayOutputStream.toByteArray(); + return new ByteArrayInputStream(messageByteArray); + } + + public static ClientProtocol.Message makePutMessage() { + Random random = new Random(); + ClientProtocol.MessageHeader.Builder messageHeader = + ClientProtocol.MessageHeader.newBuilder().setCorrelationId(random.nextInt()); + + BasicTypes.Key.Builder key = + BasicTypes.Key.newBuilder().setKey(ByteString.copyFrom(createByteArrayOfSize(64))); + + BasicTypes.Value.Builder value = + BasicTypes.Value.newBuilder().setValue(ByteString.copyFrom(createByteArrayOfSize(512))); + + RegionAPI.PutRequest.Builder putRequestBuilder = + RegionAPI.PutRequest.newBuilder().setRegionName("TestRegion") + .setEntry(BasicTypes.Entry.newBuilder().setKey(key).setValue(value)); + + ClientProtocol.Request.Builder request = + ClientProtocol.Request.newBuilder().setPutRequest(putRequestBuilder); + + ClientProtocol.Message.Builder message = + ClientProtocol.Message.newBuilder().setMessageHeader(messageHeader).setRequest(request); + + return message.build(); + } + + public static ClientProtocol.Message makePutMessageFor(String region, String key, String value) { + Random random = new Random(); + ClientProtocol.MessageHeader.Builder messageHeader = + ClientProtocol.MessageHeader.newBuilder().setCorrelationId(random.nextInt()); + + BasicTypes.Key.Builder keyBuilder = + BasicTypes.Key.newBuilder().setKey(ByteString.copyFromUtf8(key)); + + BasicTypes.Value.Builder valueBuilder = + BasicTypes.Value.newBuilder().setValue(ByteString.copyFromUtf8(value)); + + RegionAPI.PutRequest.Builder putRequestBuilder = + RegionAPI.PutRequest.newBuilder().setRegionName(region) + .setEntry(BasicTypes.Entry.newBuilder().setKey(keyBuilder).setValue(valueBuilder)); + + ClientProtocol.Request.Builder request = + ClientProtocol.Request.newBuilder().setPutRequest(putRequestBuilder); + ClientProtocol.Message.Builder message = + ClientProtocol.Message.newBuilder().setMessageHeader(messageHeader).setRequest(request); + + return message.build(); + } + + private static byte[] createByteArrayOfSize(int msgSize) { + byte[] array = new byte[msgSize]; + for (int i = 0; i < msgSize; i++) { + array[i] = 'a'; + } + return array; + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufProtocolIntegrationTest.java ---------------------------------------------------------------------- diff --git a/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufProtocolIntegrationTest.java b/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufProtocolIntegrationTest.java new file mode 100644 index 0000000..5d92cf0 --- /dev/null +++ b/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufProtocolIntegrationTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.protocol.client; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.distributed.ConfigurationProperties; +import org.apache.geode.protocol.protobuf.ClientProtocol; +import org.apache.geode.test.junit.categories.IntegrationTest; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.Properties; + +import static org.junit.Assert.*; + +@Category(IntegrationTest.class) +public class ProtobufProtocolIntegrationTest { + @Test + public void testRoundTripClientCommunicationWorks() throws IOException { + try (Cache cache = createCacheOnPort(40404); + NewClientProtocolTestClient client = new NewClientProtocolTestClient("localhost", 40404)) { + final String testRegion = "testRegion"; + final String testKey = "testKey"; + final String testValue = "testValue"; + Region region = cache.createRegionFactory().create("testRegion"); + + ClientProtocol.Message message = + MessageUtils.makePutMessageFor(testRegion, testKey, testValue); + ClientProtocol.Message response = client.blockingSendMessage(message); + client.parseResponse(response); + + assertEquals(response.getMessageTypeCase(), ClientProtocol.Message.MessageTypeCase.RESPONSE); + assertEquals(response.getResponse().getResponseAPICase(), + ClientProtocol.Response.ResponseAPICase.PUTRESPONSE); + assertTrue(response.getResponse().getPutResponse().getSuccess()); + + assertEquals(1, region.size()); + assertTrue(region.containsKey(testKey)); + assertEquals(testValue, region.get(testKey)); + } + } + + @Test + public void startCache() throws IOException { + try (Cache cache = createCacheOnPort(40404)) { + while (true) { + try { + Thread.sleep(100000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + + private Cache createCacheOnPort(int port) throws IOException { + Properties props = new Properties(); + props.setProperty(ConfigurationProperties.TCP_PORT, Integer.toString(port)); + props.setProperty(ConfigurationProperties.BIND_ADDRESS, "localhost"); + CacheFactory cf = new CacheFactory(props); + Cache cache = cf.create(); + CacheServer cacheServer = cache.addCacheServer(); + cacheServer.setBindAddress("localhost"); + cacheServer.start(); + return cache; + } + +} http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufSerializationDeserializationTest.java ---------------------------------------------------------------------- diff --git a/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufSerializationDeserializationTest.java b/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufSerializationDeserializationTest.java new file mode 100644 index 0000000..8cf36ca --- /dev/null +++ b/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufSerializationDeserializationTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.protocol.client; + + +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.Region; +import org.apache.geode.protocol.protobuf.ClientProtocol; +import org.apache.geode.serialization.Deserializer; +import org.apache.geode.serialization.SerializationType; +import org.apache.geode.test.junit.categories.UnitTest; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import static org.apache.geode.protocol.client.MessageUtils.makePutMessage; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; + + +@Category(UnitTest.class) +public class ProtobufSerializationDeserializationTest { + @Test + public void protobufSerializationSmokeTest() throws InvalidProtocolBufferException { + ClientProtocol.Message message = makePutMessage(); + byte[] bytes = message.toByteArray(); + ClientProtocol.Message message1 = ClientProtocol.Message.parseFrom(bytes); + assertEquals(message, message1); + } + + /** + * Given a serialized message that we've built, verify that the server part does the right call to + * the Cache it gets passed. + */ + @Test + public void testNewClientProtocolPutsOnPutMessage() throws IOException { + final String testRegion = "testRegion"; + final String testKey = "testKey"; + final String testValue = "testValue"; + ClientProtocol.Message message = MessageUtils.makePutMessageFor(testRegion, testKey, testValue); + + Deserializer deserializer = SerializationType.BYTE_BLOB.deserializer; + Cache mockCache = Mockito.mock(Cache.class); + Region mockRegion = Mockito.mock(Region.class); + when(mockCache.getRegion("testRegion")).thenReturn(mockRegion); + OutputStream mockOutputStream = Mockito.mock(OutputStream.class); + + ProtobufProtocolMessageHandler newClientProtocol = new ProtobufProtocolMessageHandler(); + newClientProtocol.receiveMessage(MessageUtils.loadMessageIntoInputStream(message), + mockOutputStream, deserializer, mockCache); + + verify(mockRegion).put(testKey.getBytes(), testValue.getBytes()); + } + + @Test + public void testServerRespondsToPutMessage() throws IOException { + final String testRegion = "testRegion"; + final String testKey = "testKey"; + final String testValue = "testValue"; + ClientProtocol.Message message = MessageUtils.makePutMessageFor(testRegion, testKey, testValue); + + Deserializer deserializer = SerializationType.BYTE_BLOB.deserializer; + Cache mockCache = Mockito.mock(Cache.class); + Region mockRegion = Mockito.mock(Region.class); + when(mockCache.getRegion("testRegion")).thenReturn(mockRegion); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(128); + + ProtobufProtocolMessageHandler newClientProtocol = new ProtobufProtocolMessageHandler(); + newClientProtocol.receiveMessage(MessageUtils.loadMessageIntoInputStream(message), outputStream, + deserializer, mockCache); + + ClientProtocol.Message responseMessage = ClientProtocol.Message + .parseDelimitedFrom(new ByteArrayInputStream(outputStream.toByteArray())); + + assertEquals(responseMessage.getMessageTypeCase(), + ClientProtocol.Message.MessageTypeCase.RESPONSE); + assertEquals(responseMessage.getResponse().getResponseAPICase(), + ClientProtocol.Response.ResponseAPICase.PUTRESPONSE); + assertTrue(responseMessage.getResponse().getPutResponse().getSuccess()); + } +} + http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java index a419d57..fc9eab1 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java @@ -116,7 +116,8 @@ public class ConnectionFactoryImpl implements ConnectionFactory { } else if (forQueue) { return Acceptor.CLIENT_TO_SERVER_FOR_QUEUE; } else { - return Acceptor.CLIENT_TO_SERVER; + // return Acceptor.CLIENT_TO_SERVER; + return Acceptor.CLIENT_TO_SERVER_NEW_PROTOCOL; } } http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java index f71b79b..35bbcef 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java @@ -17,6 +17,7 @@ package org.apache.geode.cache.client.internal; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; import java.nio.ByteBuffer; @@ -31,16 +32,19 @@ import org.apache.geode.cache.client.internal.ExecuteFunctionOp.ExecuteFunctionO import org.apache.geode.cache.client.internal.ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl; import org.apache.geode.cache.client.internal.ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl; import org.apache.geode.cache.wan.GatewaySender; +import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.ServerLocation; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.net.SocketCreator; +import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl; import org.apache.geode.internal.cache.tier.sockets.HandShake; import org.apache.geode.internal.cache.tier.sockets.ServerConnection; import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.log4j.LocalizedMessage; -import org.apache.geode.internal.net.SocketCreator; /** * A single client to server connection. @@ -114,7 +118,16 @@ public class ConnectionImpl implements Connection { theSocket.setSoTimeout(handShakeTimeout); out = theSocket.getOutputStream(); in = theSocket.getInputStream(); - this.status = handShake.handshakeWithServer(this, location, communicationMode); + + if (communicationMode == AcceptorImpl.CLIENT_TO_SERVER_NEW_PROTOCOL) { + handShake.writeNewProtcolVersionForServer(this, communicationMode); + InetSocketAddress remoteAddr = (InetSocketAddress) theSocket.getRemoteSocketAddress(); + DistributedMember distributedMember = + new InternalDistributedMember(remoteAddr.getAddress(), remoteAddr.getPort()); + this.status = new ServerQueueStatus(distributedMember); + } else { + this.status = handShake.handshakeWithServer(this, location, communicationMode); + } commBuffer = ServerConnection.allocateCommBuffer(socketBufferSize, theSocket); if (sender != null) { commBufferForAsyncRead = ServerConnection.allocateCommBuffer(socketBufferSize, theSocket); http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java index 9a3241b..26df0e2 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java @@ -71,6 +71,11 @@ public abstract class Acceptor { */ public static final byte CLIENT_TO_SERVER_FOR_QUEUE = (byte) 107; + /** + * For new client-server protocol which ignores current handshake mechanism + */ + public static final byte CLIENT_TO_SERVER_NEW_PROTOCOL = (byte) 110; + /** * The GFE version of the server. http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java index 9658f98..33596ff 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java @@ -1424,8 +1424,8 @@ public class AcceptorImpl extends Acceptor implements Runnable { s.setTcpNoDelay(this.tcpNoDelay); if (communicationMode == CLIENT_TO_SERVER || communicationMode == GATEWAY_TO_GATEWAY - || communicationMode == MONITOR_TO_SERVER - || communicationMode == CLIENT_TO_SERVER_FOR_QUEUE) { + || communicationMode == MONITOR_TO_SERVER || communicationMode == CLIENT_TO_SERVER_FOR_QUEUE + || communicationMode == CLIENT_TO_SERVER_NEW_PROTOCOL) { String communicationModeStr = ""; switch (communicationMode) { case CLIENT_TO_SERVER: @@ -1440,6 +1440,9 @@ public class AcceptorImpl extends Acceptor implements Runnable { case CLIENT_TO_SERVER_FOR_QUEUE: communicationModeStr = "clientToServerForQueue"; break; + case CLIENT_TO_SERVER_NEW_PROTOCOL: + communicationModeStr = "clientToServerForNewProtocol"; + break; } if (logger.isDebugEnabled()) { logger.debug("Bridge server: Initializing {} communication socket: {}", http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java new file mode 100644 index 0000000..aa6d4cb --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache.tier.sockets; + +import org.apache.geode.cache.Cache; +import org.apache.geode.serialization.Deserializer; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +public interface ClientProtocolMessageHandler { + void receiveMessage(InputStream inputStream, OutputStream outputStream, Deserializer serializer, + Cache cache) throws IOException; +} http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java index 388f838..77a3be9 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java @@ -337,6 +337,14 @@ public class HandShake implements ClientHandShake { this.credentials = null; } + public HandShake(ClientProxyMembershipID id, DistributedSystem sys, Version v) { + this.id = id; + this.code = REPLY_OK; + this.system = sys; + this.credentials = null; + this.clientVersion = v; + } + public void updateProxyID(InternalDistributedMember idm) { this.id.updateID(idm); } @@ -1177,6 +1185,19 @@ public class HandShake implements ClientHandShake { return new InternalDistributedMember(sock.getInetAddress(), sock.getPort(), false); } + public void writeNewProtcolVersionForServer(Connection conn, byte communicationMode) + throws IOException { + Socket sock = conn.getSocket(); + try { + DataOutputStream dos = new DataOutputStream(sock.getOutputStream()); + dos.writeByte(communicationMode); + } catch (IOException ex) { + CancelCriterion stopper = this.system.getCancelCriterion(); + stopper.checkCancelInProgress(null); + throw ex; + } + } + /** * Client-side handshake with a Server */ http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java index 83d0e9d..e58e213 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java @@ -19,19 +19,25 @@ import static org.apache.geode.distributed.ConfigurationProperties.*; import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.security.Principal; +import java.util.Iterator; import java.util.Map; import java.util.Properties; import java.util.Random; +import java.util.ServiceLoader; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import org.apache.geode.serialization.SerializationType; import org.apache.logging.log4j.Logger; import org.apache.shiro.subject.Subject; import org.apache.shiro.util.ThreadState; @@ -42,6 +48,7 @@ import org.apache.geode.SystemFailure; import org.apache.geode.cache.Cache; import org.apache.geode.cache.client.internal.AbstractOp; import org.apache.geode.cache.client.internal.Connection; +import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.DistributedSystem; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.Assert; @@ -147,6 +154,9 @@ public class ServerConnection implements Runnable { private final CachedRegionHelper crHelper; private String name = null; + // The new protocol lives in a separate module and gets loaded when this class is instantiated. + private static ClientProtocolMessageHandler newClientProtocol; + // IMPORTANT: if new messages are added change setHandshake to initialize them // to the correct Version for serializing to the client private Message requestMsg = new Message(2, Version.CURRENT); @@ -180,7 +190,9 @@ public class ServerConnection implements Runnable { */ private volatile int requestSpecificTimeout = -1; - /** Tracks the id of the most recent batch to which a reply has been sent */ + /** + * Tracks the id of the most recent batch to which a reply has been sent + */ private int latestBatchIdReplied = -1; /* @@ -276,6 +288,21 @@ public class ServerConnection implements Runnable { this.postAuthzRequest = null; this.randomConnectionIdGen = new Random(this.hashCode()); + if (newClientProtocol == null) { + Iterator protocolIterator = + ServiceLoader.load(ClientProtocolMessageHandler.class).iterator(); + if (protocolIterator.hasNext()) { + newClientProtocol = protocolIterator.next(); + } else { + logger.warn("Implementation not found in the JVM for ClientProtocolMessageHandler"); + } + // TODO handle multiple ClientProtocolMessageHandler impls. + if (protocolIterator.hasNext()) { + logger.warn( + "Multiple implementations found in the JVM for ClientProtocolMessageHandler; using the first one available."); + } + } + final boolean isDebugEnabled = logger.isDebugEnabled(); try { // requestMsg.setUseDataStream(useDataStream); @@ -319,11 +346,25 @@ public class ServerConnection implements Runnable { return executeFunctionOnLocalNodeOnly.get(); } + private boolean createClientHandshake() { + logger.info("createClientHandshake this.getCommunicationMode() " + this.getCommunicationMode()); + if (this.getCommunicationMode() != AcceptorImpl.CLIENT_TO_SERVER_NEW_PROTOCOL) { + return ServerHandShakeProcessor.readHandShake(this); + } else { + InetSocketAddress remoteAddress = (InetSocketAddress) theSocket.getRemoteSocketAddress(); + DistributedMember member = + new InternalDistributedMember(remoteAddress.getAddress(), remoteAddress.getPort()); + this.proxyId = new ClientProxyMembershipID(member); + this.handshake = new HandShake(this.proxyId, this.getDistributedSystem(), Version.CURRENT); + return true; + } + } + private boolean verifyClientConnection() { synchronized (this.handShakeMonitor) { if (this.handshake == null) { // synchronized (getCleanupTable()) { - boolean readHandShake = ServerHandShakeProcessor.readHandShake(this); + boolean readHandShake = createClientHandshake(); if (readHandShake) { if (this.handshake.isOK()) { try { @@ -593,8 +634,10 @@ public class ServerConnection implements Runnable { private boolean acceptHandShake(byte epType, int qSize) { try { - this.handshake.accept(theSocket.getOutputStream(), theSocket.getInputStream(), epType, qSize, - this.communicationMode, this.principal); + if (this.communicationMode != AcceptorImpl.CLIENT_TO_SERVER_NEW_PROTOCOL) { + this.handshake.accept(theSocket.getOutputStream(), theSocket.getInputStream(), epType, + qSize, this.communicationMode, this.principal); + } } catch (IOException ioe) { if (!crHelper.isShutdown() && !isTerminated()) { logger.warn(LocalizedMessage.create( @@ -905,6 +948,22 @@ public class ServerConnection implements Runnable { } private void doOneMessage() { + boolean useNewClientProtocol = + this.communicationMode == AcceptorImpl.CLIENT_TO_SERVER_NEW_PROTOCOL; + if (useNewClientProtocol) { + try { + Socket socket = this.getSocket(); + InputStream inputStream = socket.getInputStream(); + OutputStream outputStream = socket.getOutputStream(); + // TODO serialization types? + newClientProtocol.receiveMessage(inputStream, outputStream, + SerializationType.STRING.deserializer, this.getCache()); + } catch (IOException e) { + // TODO? + } + return; + } + if (this.doHandshake) { doHandshake(); this.doHandshake = false; @@ -914,6 +973,7 @@ public class ServerConnection implements Runnable { } } + private void initializeClientUserAuths() { this.clientUserAuths = getClientUserAuths(this.proxyId); } @@ -1070,7 +1130,7 @@ public class ServerConnection implements Runnable { /** * MessageType of the messages (typically internal commands) which do not need to participate in * security should be added in the following if block. - * + * * @return Part * @see AbstractOp#processSecureBytes(Connection, Message) * @see AbstractOp#needsUserId() @@ -1191,6 +1251,7 @@ public class ServerConnection implements Runnable { * If registered with a selector then this will be the key we are registered with. */ // private SelectionKey sKey = null; + /** * Register this connection with the given selector for read events. Note that switch the channel * to non-blocking so it can be in a selector. @@ -1206,7 +1267,8 @@ public class ServerConnection implements Runnable { } public void registerWithSelector2(Selector s) throws IOException { - /* this.sKey = */getSelectableChannel().register(s, SelectionKey.OP_READ, this); + /* this.sKey = */ + getSelectableChannel().register(s, SelectionKey.OP_READ, this); } /** @@ -1229,7 +1291,6 @@ public class ServerConnection implements Runnable { } /** - * * @return String representing the DistributedSystemMembership of the Client VM */ public String getMembershipID() { @@ -1497,7 +1558,7 @@ public class ServerConnection implements Runnable { /** * Just ensure that this class gets loaded. - * + * * @see SystemFailure#loadEmergencyClasses() */ public static void loadEmergencyClasses() { @@ -1524,7 +1585,9 @@ public class ServerConnection implements Runnable { return this.name; } - /** returns the name of this connection */ + /** + * returns the name of this connection + */ public String getName() { return this.name; } @@ -1808,7 +1871,9 @@ public class ServerConnection implements Runnable { return postAuthReq; } - /** returns the member ID byte array to be used for creating EventID objects */ + /** + * returns the member ID byte array to be used for creating EventID objects + */ public byte[] getEventMemberIDByteArray() { return this.memberIdByteArray; } http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-core/src/main/java/org/apache/geode/serialization/Deserializer.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/serialization/Deserializer.java b/geode-core/src/main/java/org/apache/geode/serialization/Deserializer.java new file mode 100644 index 0000000..e4cd20c --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/serialization/Deserializer.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.serialization; + +public interface Deserializer { + public abstract T deserialize(byte[] bytes); +} http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-core/src/main/java/org/apache/geode/serialization/FunctionalSerializer.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/serialization/FunctionalSerializer.java b/geode-core/src/main/java/org/apache/geode/serialization/FunctionalSerializer.java new file mode 100644 index 0000000..a3259db --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/serialization/FunctionalSerializer.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.serialization; + +public class FunctionalSerializer implements Serializer, Deserializer { + public final Serializer serializer; + public final Deserializer deserializer; + + public FunctionalSerializer(Serializer serializer, Deserializer deserializer) { + this.serializer = serializer; + this.deserializer = deserializer; + } + + @Override + public T deserialize(byte[] bytes) { + return deserializer.deserialize(bytes); + } + + @Override + public byte[] serialize(T item) { + return serializer.serialize(item); + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-core/src/main/java/org/apache/geode/serialization/SerializationType.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/serialization/SerializationType.java b/geode-core/src/main/java/org/apache/geode/serialization/SerializationType.java new file mode 100644 index 0000000..2ea4066 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/serialization/SerializationType.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.serialization; + +import org.apache.geode.pdx.JSONFormatter; +import org.apache.geode.pdx.PdxInstance; + +public enum SerializationType { + STRING(String.class, String::getBytes, String::new), + BYTE_BLOB(byte[].class, (x) -> x, (x) -> x), + JSON(PdxInstance.class, JSONFormatter::toJSONByteArray, JSONFormatter::fromJSON); + + public final Class klass; + public final Serializer serializer; + public final Deserializer deserializer; + + SerializationType(Class klass, Serializer serializer, Deserializer deserializer) { + this.klass = klass; + this.serializer = serializer; + this.deserializer = deserializer; + } +} + http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-core/src/main/java/org/apache/geode/serialization/Serializer.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/serialization/Serializer.java b/geode-core/src/main/java/org/apache/geode/serialization/Serializer.java new file mode 100644 index 0000000..6be877f --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/serialization/Serializer.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.serialization; + +public interface Serializer { + public abstract byte[] serialize(T item); +} http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/geode-core/src/test/java/org/apache/geode/pdx/PdxClientServerDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/pdx/PdxClientServerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/pdx/PdxClientServerDUnitTest.java index 6fcdf4c..0272285 100644 --- a/geode-core/src/test/java/org/apache/geode/pdx/PdxClientServerDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/pdx/PdxClientServerDUnitTest.java @@ -593,7 +593,10 @@ public class PdxClientServerDUnitTest extends JUnit4CacheTestCase { } private int createServerRegion(final Class constraintClass) throws IOException { - CacheFactory cf = new CacheFactory(getDistributedSystemProperties()); + Properties p = getDistributedSystemProperties(); + p.put("log-level", "debug"); + CacheFactory cf = new CacheFactory(); + Cache cache = getCache(cf); RegionFactory rf = cache.createRegionFactory(RegionShortcut.REPLICATE); rf.setValueConstraint(constraintClass); http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/gradle/rat.gradle ---------------------------------------------------------------------- diff --git a/gradle/rat.gradle b/gradle/rat.gradle index f8018b6..7e1d61d 100644 --- a/gradle/rat.gradle +++ b/gradle/rat.gradle @@ -120,11 +120,7 @@ rat { 'geode-core/src/test/resources/org/apache/geode/management/internal/configuration/domain/CacheElementJUnitTest.xml', 'geode-core/src/test/resources/org/apache/geode/management/internal/configuration/utils/*.xml', - '**/META-INF/services/org.xml.sax.ext.EntityResolver2', - '**/META-INF/services/org.apache.geode.internal.cache.CacheService', - '**/META-INF/services/org.apache.geode.internal.cache.xmlcache.XmlParser', - '**/META-INF/services/org.apache.geode.distributed.ServerLauncherCacheProvider', - '**/META-INF/services/org.springframework.shell.core.CommandMarker', + '**/META-INF/services/*', // --- Other Licenses --- http://git-wip-us.apache.org/repos/asf/geode/blob/511e2a35/settings.gradle ---------------------------------------------------------------------- diff --git a/settings.gradle b/settings.gradle index c0fdb6e..3efc2dc 100644 --- a/settings.gradle +++ b/settings.gradle @@ -16,6 +16,7 @@ */ rootProject.name = 'geode' +include 'geode-client-protobuf' include 'geode-old-versions' include 'geode-common' include 'geode-json'