Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 34535200CD7 for ; Tue, 27 Jun 2017 07:00:23 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 332BE160BFD; Tue, 27 Jun 2017 05:00:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 80BC4160BFF for ; Tue, 27 Jun 2017 07:00:21 +0200 (CEST) Received: (qmail 38942 invoked by uid 500); 27 Jun 2017 05:00:20 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 37574 invoked by uid 99); 27 Jun 2017 05:00:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Jun 2017 05:00:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B676FE041D; Tue, 27 Jun 2017 05:00:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhouxj@apache.org To: commits@geode.apache.org Date: Tue, 27 Jun 2017 05:00:58 -0000 Message-Id: <9d437b2507384cb9b0d5075e0bb7308f@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [43/46] geode git commit: merge GEODE-2995, GEODE-3775 into develop with integration test. archived-at: Tue, 27 Jun 2017 05:00:23 -0000 merge GEODE-2995, GEODE-3775 into develop with integration test. Signed-off-by: Brian Rowe Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/db11ebc1 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/db11ebc1 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/db11ebc1 Branch: refs/heads/feature/GEM-1483 Commit: db11ebc1efdf2371bdf1566a7828dc08ea621e2c Parents: e62e825 Author: Galen OSullivan Authored: Wed Jun 21 16:53:03 2017 -0700 Committer: Hitesh Khamesra Committed: Mon Jun 26 09:26:23 2017 -0700 ---------------------------------------------------------------------- .../sockets/ClientProtocolMessageHandler.java | 18 +-- .../tier/sockets/ServerConnectionFactory.java | 39 +++++- .../sockets/ServiceLoadingFailureException.java | 35 ++++++ .../ServerConnectionFactoryIntegrationTest.java | 67 ---------- .../sockets/ServerConnectionFactoryTest.java | 43 ++++++- .../protobuf/ProtobufStreamProcessor.java | 15 ++- ...he.tier.sockets.ClientProtocolMessageHandler | 1 + .../RoundTripCacheConnectionJUnitTest.java | 123 +++++++++++++++++++ 8 files changed, 256 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/db11ebc1/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java index 702609d..32e9e4b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java @@ -22,14 +22,14 @@ import java.io.InputStream; import java.io.OutputStream; /** - * Stub, this will be hooked up to the new client protocol when it's implemented. + * This is an interface that other modules can implement to hook into + * {@link GenericProtocolServerConnection} to handle messages sent to Geode. + * + * Currently, only one {@link ClientProtocolMessageHandler} at a time can be used in a Geode + * instance. It gets wired into {@link ServerConnectionFactory} to create all instances of + * {@link GenericProtocolServerConnection}. */ -public class ClientProtocolMessageHandler { - /** - * Bogus, but it lets us write an integration test so that nobody breaks our flow. - */ - public void receiveMessage(InputStream inputStream, OutputStream outputStream, - InternalCache cache) throws IOException { - outputStream.write(inputStream.read()); - } +public interface ClientProtocolMessageHandler { + void receiveMessage(InputStream inputStream, OutputStream outputStream, InternalCache cache) + throws IOException; } http://git-wip-us.apache.org/repos/asf/geode/blob/db11ebc1/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java index e4746a7..ad13b78 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java @@ -22,14 +22,46 @@ import org.apache.geode.internal.security.SecurityService; import java.io.IOException; import java.net.Socket; +import java.util.Iterator; +import java.util.ServiceLoader; +import javax.management.ServiceNotFoundException; /** * Creates instances of ServerConnection based on the connection mode provided. */ public class ServerConnectionFactory { - // TODO: implement ClientProtocolMessageHandler. - private static final ClientProtocolMessageHandler protobufProtocolHandler = - new ClientProtocolMessageHandler(); + private static ClientProtocolMessageHandler protobufProtocolHandler; + private static final Object protocolLoadLock = new Object(); + + private static ClientProtocolMessageHandler findClientProtocolMessageHandler() { + if (protobufProtocolHandler != null) { + return protobufProtocolHandler; + } + + synchronized (protocolLoadLock) { + if (protobufProtocolHandler != null) { + return protobufProtocolHandler; + } + + ServiceLoader loader = + ServiceLoader.load(ClientProtocolMessageHandler.class); + Iterator iterator = loader.iterator(); + + if (!iterator.hasNext()) { + throw new ServiceLoadingFailureException( + "ClientProtocolMessageHandler implementation not found in JVM"); + } + + ClientProtocolMessageHandler returnValue = iterator.next(); + + if (iterator.hasNext()) { + throw new ServiceLoadingFailureException( + "Multiple service implementations found for ClientProtocolMessageHandler"); + } + + return returnValue; + } + } public static ServerConnection makeServerConnection(Socket s, InternalCache c, CachedRegionHelper helper, CacheServerStats stats, int hsTimeout, int socketBufferSize, @@ -39,6 +71,7 @@ public class ServerConnectionFactory { if (!Boolean.getBoolean("geode.feature-protobuf-protocol")) { throw new IOException("Acceptor received unknown communication mode: " + communicationMode); } else { + protobufProtocolHandler = findClientProtocolMessageHandler(); return new GenericProtocolServerConnection(s, c, helper, stats, hsTimeout, socketBufferSize, communicationModeStr, communicationMode, acceptor, protobufProtocolHandler, securityService); http://git-wip-us.apache.org/repos/asf/geode/blob/db11ebc1/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServiceLoadingFailureException.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServiceLoadingFailureException.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServiceLoadingFailureException.java new file mode 100644 index 0000000..be39672 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServiceLoadingFailureException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache.tier.sockets; + +import org.apache.geode.GemFireException; + +/** + * Indicates that an error has happened loading a necessary service. + */ +public class ServiceLoadingFailureException extends GemFireException { + public ServiceLoadingFailureException(String message) { + super(message); + } + + public ServiceLoadingFailureException(Exception cause) { + super(cause); + } + + public ServiceLoadingFailureException(String message, Exception cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/db11ebc1/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryIntegrationTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryIntegrationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryIntegrationTest.java deleted file mode 100644 index 9a1509f..0000000 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryIntegrationTest.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.geode.internal.cache.tier.sockets; - -import static org.junit.Assert.assertEquals; - -import org.apache.geode.cache.Cache; -import org.apache.geode.cache.CacheFactory; -import org.apache.geode.cache.server.CacheServer; -import org.apache.geode.internal.AvailablePortHelper; -import org.apache.geode.internal.cache.tier.Acceptor; -import org.apache.geode.test.junit.categories.IntegrationTest; -import org.awaitility.Awaitility; -import org.junit.Rule; -import org.junit.Test; -import org.junit.contrib.java.lang.system.RestoreSystemProperties; -import org.junit.experimental.categories.Category; - -import java.io.IOException; -import java.net.Socket; -import java.util.concurrent.TimeUnit; - -/** - * Test that switching on the header byte makes instances of - * {@link GenericProtocolServerConnection}. - */ -@Category(IntegrationTest.class) -public class ServerConnectionFactoryIntegrationTest { - - @Rule - public final RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties(); - - @Test - public void testNewProtocolHeaderLeadsToNewProtocolServerConnection() throws IOException { - System.setProperty("geode.feature-protobuf-protocol", "true"); - - CacheFactory cacheFactory = new CacheFactory(); - cacheFactory.set("mcast-port","0"); //sometimes it isn't due to other tests. - Cache cache = cacheFactory.create(); - - CacheServer cacheServer = cache.addCacheServer(); - final int cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort(); - cacheServer.setPort(cacheServerPort); - cacheServer.start(); - - Socket socket = new Socket("localhost", cacheServerPort); - Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected); - socket.getOutputStream().write(Acceptor.PROTOBUF_CLIENT_SERVER_PROTOCOL); - socket.getOutputStream().write(222); - assertEquals(222, socket.getInputStream().read()); - - cache.close(); - } -} http://git-wip-us.apache.org/repos/asf/geode/blob/db11ebc1/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java index 11b5289..4e994cd 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java @@ -31,6 +31,14 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +/** + * We don't test the path where the service providing protobufProtocolHandler is actually present, + * because it lives outside this module, and all the integration tests from that module will test + * the newclient protocol happy path. + * + * What we are concerned with is making sure that everything stays the same when the feature flag + * isn't set, and that we at least try to load the service when the feature flag is true. + */ @Category(UnitTest.class) public class ServerConnectionFactoryTest { /** @@ -38,17 +46,22 @@ public class ServerConnectionFactoryTest { * enabled. */ @Test(expected = IOException.class) - public void newClientProtocolThrows() throws Exception { - serverConnectionMockedExceptForCommunicationMode(Acceptor.PROTOBUF_CLIENT_SERVER_PROTOCOL); + public void newClientProtocolFailsWithoutSystemPropertySet() throws Exception { + ServerConnection serverConnection = + serverConnectionMockedExceptForCommunicationMode(Acceptor.PROTOBUF_CLIENT_SERVER_PROTOCOL); + } - @Test - public void newClientProtocolSucceedsWithSystemPropertySet() throws Exception { + /** + * @throws ServiceLoadingFailureException because the service is implemented in a different + * module, and when this unit test is run, that module won't be present. + */ + @Test(expected = ServiceLoadingFailureException.class) + public void newClientProtocolFailsWithSystemPropertySet() throws Exception { try { System.setProperty("geode.feature-protobuf-protocol", "true"); ServerConnection serverConnection = serverConnectionMockedExceptForCommunicationMode( Acceptor.PROTOBUF_CLIENT_SERVER_PROTOCOL); - assertTrue(serverConnection instanceof GenericProtocolServerConnection); } finally { System.clearProperty("geode.feature-protobuf-protocol"); } @@ -69,6 +82,26 @@ public class ServerConnectionFactoryTest { } } + @Test + public void makeServerConnectionForOldProtocolWithFeatureFlagEnabled() throws IOException { + System.setProperty("geode.feature-protobuf-protocol", "true"); + try { + byte[] communicationModes = + new byte[] {Acceptor.CLIENT_TO_SERVER, Acceptor.PRIMARY_SERVER_TO_CLIENT, + Acceptor.SECONDARY_SERVER_TO_CLIENT, Acceptor.GATEWAY_TO_GATEWAY, + Acceptor.MONITOR_TO_SERVER, Acceptor.SUCCESSFUL_SERVER_TO_CLIENT, + Acceptor.UNSUCCESSFUL_SERVER_TO_CLIENT, Acceptor.CLIENT_TO_SERVER_FOR_QUEUE,}; + + for (byte communicationMode : communicationModes) { + ServerConnection serverConnection = + serverConnectionMockedExceptForCommunicationMode(communicationMode); + assertTrue(serverConnection instanceof LegacyServerConnection); + } + } finally { + System.clearProperty("geode.feature-protobuf-protocol"); + } + } + private static ServerConnection serverConnectionMockedExceptForCommunicationMode( byte communicationMode) throws IOException { Socket socketMock = mock(Socket.class); http://git-wip-us.apache.org/repos/asf/geode/blob/db11ebc1/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java index 1dcb61c..d7b5d4b 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java @@ -15,6 +15,8 @@ package org.apache.geode.protocol.protobuf; import org.apache.geode.cache.Cache; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler; import org.apache.geode.protocol.exception.InvalidProtocolMessageException; import org.apache.geode.protocol.protobuf.serializer.ProtobufProtocolSerializer; import org.apache.geode.protocol.operations.registry.OperationsHandlerRegistry; @@ -34,7 +36,7 @@ import java.io.OutputStream; * messages, hands the requests to an appropriate handler, wraps the response in a protobuf message, * and then pushes it to the output stream. */ -public class ProtobufStreamProcessor { +public class ProtobufStreamProcessor implements ClientProtocolMessageHandler { ProtobufProtocolSerializer protobufProtocolSerializer; OperationsHandlerRegistry registry; ProtobufSerializationService protobufSerializationService; @@ -60,4 +62,15 @@ public class ProtobufStreamProcessor { ProtobufUtilities.wrapResponseWithDefaultHeader(response); protobufProtocolSerializer.serialize(responseMessage, outputStream); } + + @Override + public void receiveMessage(InputStream inputStream, OutputStream outputStream, + InternalCache cache) throws IOException { + try { + processOneMessage(inputStream, outputStream, cache); + } catch (InvalidProtocolMessageException | OperationHandlerNotRegisteredException + | TypeEncodingException e) { + throw new IOException(e); + } + } } http://git-wip-us.apache.org/repos/asf/geode/blob/db11ebc1/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler b/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler new file mode 100644 index 0000000..5a17eda --- /dev/null +++ b/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler @@ -0,0 +1 @@ +org.apache.geode.protocol.protobuf.ProtobufStreamProcessor \ No newline at end of file http://git-wip-us.apache.org/repos/asf/geode/blob/db11ebc1/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java new file mode 100644 index 0000000..b9faca3 --- /dev/null +++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.protocol; + +import static org.junit.Assert.assertEquals; + +import com.google.protobuf.ByteString; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.internal.AvailablePortHelper; +import org.apache.geode.internal.cache.tier.sockets.GenericProtocolServerConnection; +import org.apache.geode.protocol.protobuf.BasicTypes; +import org.apache.geode.protocol.protobuf.ClientProtocol; +import org.apache.geode.protocol.protobuf.ProtobufSerializationService; +import org.apache.geode.protocol.protobuf.RegionAPI; +import org.apache.geode.protocol.protobuf.serializer.ProtobufProtocolSerializer; +import org.apache.geode.serialization.codec.StringCodec; +import org.apache.geode.test.junit.categories.IntegrationTest; +import org.awaitility.Awaitility; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.contrib.java.lang.system.RestoreSystemProperties; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.util.concurrent.TimeUnit; + +/** + * Test that switching on the header byte makes instances of + * {@link GenericProtocolServerConnection}. + */ +@Category(IntegrationTest.class) +public class RoundTripCacheConnectionJUnitTest { + public static final String TEST_KEY = "testKey"; + public static final String TEST_VALUE = "testValue"; + public static final String TEST_REGION = "testRegion"; + + private Cache cache; + private int cacheServerPort; + + @Rule + public final RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties(); + + @Before + public void setup() throws IOException { + CacheFactory cacheFactory = new CacheFactory(); + cacheFactory.set("mcast-port", "0"); // sometimes it isn't due to other tests. + cache = cacheFactory.create(); + + CacheServer cacheServer = cache.addCacheServer(); + cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort(); + cacheServer.setPort(cacheServerPort); + cacheServer.start(); + + RegionFactory regionFactory = cache.createRegionFactory(); + Region testRegion = regionFactory.create(TEST_REGION); + testRegion.put(TEST_KEY, TEST_VALUE); + } + + @After + public void cleanup() { + cache.close(); + } + + @Test + public void testNewProtocolHeaderLeadsToNewProtocolServerConnection() throws Exception { + System.setProperty("geode.feature-protobuf-protocol", "true"); + + Socket socket = new Socket("localhost", cacheServerPort); + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected); + OutputStream outputStream = socket.getOutputStream(); + outputStream.write(110); + + ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer(); + protobufProtocolSerializer.serialize(makeTestGetRequest(new StringCodec()), outputStream); + + ClientProtocol.Message message = + protobufProtocolSerializer.deserialize(socket.getInputStream()); + assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE, message.getMessageTypeCase()); + ClientProtocol.Response response = message.getResponse(); + assertEquals(ClientProtocol.Response.ResponseAPICase.GETRESPONSE, + response.getResponseAPICase()); + RegionAPI.GetResponse getResponse = response.getGetResponse(); + BasicTypes.EncodedValue result = getResponse.getResult(); + assertEquals(BasicTypes.EncodingType.STRING, result.getEncodingType()); + assertEquals(TEST_VALUE, new ProtobufSerializationService().decode(result.getEncodingType(), + result.getValue().toByteArray())); + } + + private ClientProtocol.Message makeTestGetRequest(StringCodec stringCodec) { + RegionAPI.GetRequest.Builder getRequestBuilder = RegionAPI.GetRequest.newBuilder(); + getRequestBuilder.setRegionName(TEST_REGION) + .setKey(BasicTypes.EncodedValue.newBuilder().setEncodingType(BasicTypes.EncodingType.STRING) + .setValue(ByteString.copyFrom(stringCodec.encode(TEST_KEY)))); + ClientProtocol.Request request = + ClientProtocol.Request.newBuilder().setGetRequest(getRequestBuilder).build(); + ClientProtocol.Message requestMessage = ClientProtocol.Message.newBuilder() + .setMessageHeader(ClientProtocol.MessageHeader.newBuilder()).setRequest(request).build(); + + return requestMessage; + } +}