Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EA84C200D3B for ; Thu, 26 Oct 2017 19:01:57 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E92D21609E5; Thu, 26 Oct 2017 17:01:57 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7FD23160BF9 for ; Thu, 26 Oct 2017 19:01:55 +0200 (CEST) Received: (qmail 75473 invoked by uid 500); 26 Oct 2017 17:01:54 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 75118 invoked by uid 99); 26 Oct 2017 17:01:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Oct 2017 17:01:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1E3A9DFC2E; Thu, 26 Oct 2017 17:01:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kkloudas@apache.org To: commits@flink.apache.org Date: Thu, 26 Oct 2017 17:02:01 -0000 Message-Id: <307cd057fba44143a8cadaa56a68c23c@git.apache.org> In-Reply-To: <665226b2a6b540509708f0b89f275eb8@git.apache.org> References: <665226b2a6b540509708f0b89f275eb8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [12/13] flink git commit: [FLINK-7908][QS] Restructure the queryable state module. archived-at: Thu, 26 Oct 2017 17:01:58 -0000 [FLINK-7908][QS] Restructure the queryable state module. The QS module is split into core and client. The core should be put in the lib folder to enable queryable state, while the client is the one that the user will program against. The reason for the restructuring in mainly to remove the dependency on the flink-runtime from the user's program. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c771505 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c771505 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c771505 Branch: refs/heads/master Commit: 0c771505b84cdacf7a359c3be0efe38a30f9e660 Parents: 8595dad Author: kkloudas Authored: Tue Oct 24 12:16:08 2017 +0200 Committer: kkloudas Committed: Thu Oct 26 18:57:34 2017 +0200 ---------------------------------------------------------------------- .../streaming/state/AbstractRocksDBState.java | 2 +- .../streaming/state/RocksDBMapState.java | 2 +- .../flink-queryable-state-client-java/pom.xml | 90 ++ .../flink/queryablestate/FutureUtils.java | 43 + .../apache/flink/queryablestate/KvStateID.java | 40 + .../client/QueryableStateClient.java | 257 +++ .../queryablestate/client/VoidNamespace.java | 75 + .../client/VoidNamespaceSerializer.java | 96 ++ .../client/VoidNamespaceTypeInfo.java | 92 ++ .../client/state/ImmutableAggregatingState.java | 71 + .../client/state/ImmutableFoldingState.java | 70 + .../client/state/ImmutableListState.java | 70 + .../client/state/ImmutableMapState.java | 139 ++ .../client/state/ImmutableReducingState.java | 69 + .../client/state/ImmutableState.java | 29 + .../client/state/ImmutableStateBinder.java | 80 + .../client/state/ImmutableValueState.java | 69 + .../serialization/DataInputDeserializer.java | 392 +++++ .../serialization/DataOutputSerializer.java | 344 ++++ .../state/serialization/KvStateSerializer.java | 265 ++++ .../exceptions/UnknownJobManagerException.java | 36 + .../UnknownKeyOrNamespaceException.java | 39 + .../exceptions/UnknownKvStateIdException.java | 42 + ...UnknownKvStateKeyGroupLocationException.java | 39 + .../queryablestate/messages/KvStateRequest.java | 140 ++ .../messages/KvStateResponse.java | 74 + .../network/AbstractServerBase.java | 308 ++++ .../network/AbstractServerHandler.java | 305 ++++ .../network/BadRequestException.java | 35 + .../queryablestate/network/ChunkedByteBuf.java | 100 ++ .../flink/queryablestate/network/Client.java | 536 +++++++ .../queryablestate/network/ClientHandler.java | 122 ++ .../network/ClientHandlerCallback.java | 56 + .../queryablestate/network/NettyBufferPool.java | 171 ++ .../network/messages/MessageBody.java | 38 + .../network/messages/MessageDeserializer.java | 39 + .../network/messages/MessageSerializer.java | 320 ++++ .../network/messages/MessageType.java | 42 + .../network/messages/RequestFailure.java | 71 + .../stats/AtomicKvStateRequestStats.java | 104 ++ .../stats/DisabledKvStateRequestStats.java | 45 + .../network/stats/KvStateRequestStats.java | 54 + .../client/VoidNamespaceTypeInfoTest.java | 32 + .../state/ImmutableAggregatingStateTest.java | 107 ++ .../client/state/ImmutableFoldingStateTest.java | 93 ++ .../client/state/ImmutableListStateTest.java | 110 ++ .../client/state/ImmutableMapStateTest.java | 188 +++ .../state/ImmutableReducingStateTest.java | 83 + .../client/state/ImmutableValueStateTest.java | 69 + .../src/test/resources/log4j-test.properties | 31 + .../flink-queryable-state-java/pom.xml | 137 -- .../UnknownJobManagerException.java | 36 - .../UnknownKeyOrNamespaceException.java | 39 - .../UnknownKvStateIdException.java | 42 - ...UnknownKvStateKeyGroupLocationException.java | 41 - .../client/QueryableStateClient.java | 260 --- .../client/proxy/KvStateClientProxyHandler.java | 225 --- .../client/proxy/KvStateClientProxyImpl.java | 128 -- .../client/state/ImmutableAggregatingState.java | 71 - .../client/state/ImmutableFoldingState.java | 70 - .../client/state/ImmutableListState.java | 70 - .../client/state/ImmutableMapState.java | 139 -- .../client/state/ImmutableReducingState.java | 69 - .../client/state/ImmutableState.java | 29 - .../client/state/ImmutableStateBinder.java | 80 - .../client/state/ImmutableValueState.java | 69 - .../messages/KvStateInternalRequest.java | 93 -- .../queryablestate/messages/KvStateRequest.java | 141 -- .../messages/KvStateResponse.java | 75 - .../network/AbstractServerBase.java | 310 ---- .../network/AbstractServerHandler.java | 306 ---- .../network/BadRequestException.java | 35 - .../queryablestate/network/ChunkedByteBuf.java | 100 -- .../flink/queryablestate/network/Client.java | 537 ------- .../queryablestate/network/ClientHandler.java | 122 -- .../network/ClientHandlerCallback.java | 56 - .../network/messages/MessageBody.java | 38 - .../network/messages/MessageDeserializer.java | 39 - .../network/messages/MessageSerializer.java | 320 ---- .../network/messages/MessageType.java | 42 - .../network/messages/RequestFailure.java | 71 - .../server/KvStateServerHandler.java | 107 -- .../server/KvStateServerImpl.java | 111 -- .../itcases/AbstractQueryableStateTestBase.java | 1496 ------------------ .../HAAbstractQueryableStateTestBase.java | 98 -- .../HAQueryableStateFsBackendITCase.java | 45 - .../HAQueryableStateRocksDBBackendITCase.java | 47 - .../KVStateRequestSerializerRocksDBTest.java | 167 -- .../NonHAAbstractQueryableStateTestBase.java | 78 - .../NonHAQueryableStateFsBackendITCase.java | 45 - ...NonHAQueryableStateRocksDBBackendITCase.java | 47 - .../network/AbstractServerTest.java | 219 --- .../queryablestate/network/ClientTest.java | 784 --------- .../network/KvStateClientHandlerTest.java | 119 -- .../network/KvStateServerHandlerTest.java | 758 --------- .../network/KvStateServerTest.java | 207 --- .../network/MessageSerializerTest.java | 220 --- .../state/ImmutableAggregatingStateTest.java | 108 -- .../state/ImmutableFoldingStateTest.java | 94 -- .../state/ImmutableListStateTest.java | 112 -- .../state/ImmutableMapStateTest.java | 189 --- .../state/ImmutableReducingStateTest.java | 84 - .../state/ImmutableValueStateTest.java | 70 - .../src/test/resources/log4j-test.properties | 31 - .../flink-queryable-state-runtime/pom.xml | 119 ++ .../client/proxy/KvStateClientProxyHandler.java | 225 +++ .../client/proxy/KvStateClientProxyImpl.java | 128 ++ .../messages/KvStateInternalRequest.java | 93 ++ .../server/KvStateServerHandler.java | 107 ++ .../server/KvStateServerImpl.java | 111 ++ .../itcases/AbstractQueryableStateTestBase.java | 1496 ++++++++++++++++++ .../HAAbstractQueryableStateTestBase.java | 98 ++ .../HAQueryableStateFsBackendITCase.java | 45 + .../HAQueryableStateRocksDBBackendITCase.java | 47 + .../KVStateRequestSerializerRocksDBTest.java | 167 ++ .../NonHAAbstractQueryableStateTestBase.java | 78 + .../NonHAQueryableStateFsBackendITCase.java | 45 + ...NonHAQueryableStateRocksDBBackendITCase.java | 47 + .../network/AbstractServerTest.java | 219 +++ .../queryablestate/network/ClientTest.java | 782 +++++++++ .../network/KvStateClientHandlerTest.java | 119 ++ .../network/KvStateRequestSerializerTest.java | 416 +++++ .../network/KvStateServerHandlerTest.java | 758 +++++++++ .../network/KvStateServerTest.java | 212 +++ .../network/MessageSerializerTest.java | 220 +++ .../src/test/resources/log4j-test.properties | 31 + flink-queryable-state/pom.xml | 5 +- flink-runtime/pom.xml | 6 + .../flink/runtime/jobmaster/JobMaster.java | 6 +- .../runtime/jobmaster/JobMasterGateway.java | 6 +- .../apache/flink/runtime/query/KvStateID.java | 41 - .../flink/runtime/query/KvStateLocation.java | 14 +- .../runtime/query/KvStateLocationRegistry.java | 4 +- .../flink/runtime/query/KvStateMessage.java | 10 +- .../flink/runtime/query/KvStateRegistry.java | 1 + .../runtime/query/KvStateRegistryGateway.java | 5 +- .../runtime/query/KvStateRegistryListener.java | 1 + .../flink/runtime/query/KvStateServer.java | 6 +- .../runtime/query/KvStateServerAddress.java | 95 -- .../runtime/query/QueryableStateUtils.java | 8 +- .../runtime/query/TaskKvStateRegistry.java | 1 + .../query/netty/AtomicKvStateRequestStats.java | 104 -- .../netty/DisabledKvStateRequestStats.java | 45 - .../query/netty/KvStateRequestStats.java | 55 - .../query/netty/message/KvStateSerializer.java | 267 ---- .../runtime/state/heap/AbstractHeapState.java | 6 +- .../flink/runtime/state/heap/HeapMapState.java | 2 +- .../taskexecutor/TaskManagerServices.java | 2 +- .../rpc/RpcKvStateRegistryListener.java | 9 +- .../ActorGatewayKvStateRegistryListener.java | 11 +- .../runtime/jobmanager/JobManagerTest.java | 12 +- .../query/KvStateLocationRegistryTest.java | 10 +- .../runtime/query/KvStateLocationTest.java | 8 +- .../message/KvStateRequestSerializerTest.java | 415 ----- .../runtime/state/StateBackendTestBase.java | 4 +- .../completeness/TypeInfoTestCoverageTest.java | 3 +- pom.xml | 2 +- tools/travis_mvn_watchdog.sh | 3 +- 158 files changed, 11373 insertions(+), 10107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java index cf365b4..969a1fc 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java @@ -27,7 +27,7 @@ import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.query.netty.message.KvStateSerializer; +import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.util.Preconditions; http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java index 421bb2e..e8c34cc 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java @@ -26,7 +26,7 @@ import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.query.netty.message.KvStateSerializer; +import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.internal.InternalMapState; import org.apache.flink.util.Preconditions; http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/pom.xml ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/pom.xml b/flink-queryable-state/flink-queryable-state-client-java/pom.xml new file mode 100644 index 0000000..8a4ff69 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/pom.xml @@ -0,0 +1,90 @@ + + + + + 4.0.0 + + + org.apache.flink + flink-queryable-state + 1.4-SNAPSHOT + .. + + + flink-queryable-state-client-java_${scala.binary.version} + flink-queryable-state-client-java + jar + + + + + + + org.apache.flink + flink-core + ${project.version} + provided + + + + org.apache.flink + flink-shaded-netty + + + + org.apache.flink + flink-shaded-guava + + + + org.apache.flink + flink-core + ${project.version} + test + test-jar + + + + org.apache.flink + flink-test-utils-junit + ${project.version} + test + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + + http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/FutureUtils.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/FutureUtils.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/FutureUtils.java new file mode 100644 index 0000000..e2af7b1 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/FutureUtils.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate; + +import java.util.concurrent.CompletableFuture; + +/** + * Utility class for {@link java.util.concurrent.Future Java Futures}. + */ +public class FutureUtils { + + // ------------------------------------------------------------------------ + // Future Completed with an exception. + // ------------------------------------------------------------------------ + + /** + * Returns a {@link CompletableFuture} that has failed with the exception + * provided as argument. + * @param throwable the exception to fail the future with. + * @return The failed future. + */ + public static CompletableFuture getFailedFuture(Throwable throwable) { + CompletableFuture failedAttempt = new CompletableFuture<>(); + failedAttempt.completeExceptionally(throwable); + return failedAttempt; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/KvStateID.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/KvStateID.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/KvStateID.java new file mode 100644 index 0000000..992b283 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/KvStateID.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate; + +import org.apache.flink.util.AbstractID; + +/** + * Identifier for state instances. + * + *

Assigned when registering the state at the state registry. + */ +public class KvStateID extends AbstractID { + + private static final long serialVersionUID = 1L; + + public KvStateID() { + super(); + } + + public KvStateID(long lowerPart, long upperPart) { + super(lowerPart, upperPart); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java new file mode 100644 index 0000000..304505a --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.queryablestate.FutureUtils; +import org.apache.flink.queryablestate.client.state.ImmutableStateBinder; +import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; +import org.apache.flink.queryablestate.messages.KvStateRequest; +import org.apache.flink.queryablestate.messages.KvStateResponse; +import org.apache.flink.queryablestate.network.Client; +import org.apache.flink.queryablestate.network.messages.MessageSerializer; +import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.concurrent.CompletableFuture; + +/** + * Client for querying Flink's managed state. + * + *

You can mark state as queryable via {@link StateDescriptor#setQueryable(String)}. + * The state instance created from this descriptor will be published for queries when it's + * created on the Task Managers and the location will be reported to the Job Manager. + * + *

The client connects to a {@code Client Proxy} running on a given Task Manager. The + * proxy is the entry point of the client to the Flink cluster. It forwards the requests + * of the client to the Job Manager and the required Task Manager, and forwards the final + * response back the client. + * + *

The proxy, initially resolves the location of the requested KvState via the JobManager. Resolved + * locations are cached. When the server address of the requested KvState instance is determined, the + * client sends out a request to the server. The returned final answer is then forwarded to the Client. + */ +@PublicEvolving +public class QueryableStateClient { + + private static final Logger LOG = LoggerFactory.getLogger(QueryableStateClient.class); + + /** The client that forwards the requests to the proxy. */ + private final Client client; + + /** The address of the proxy this client is connected to. */ + private final InetSocketAddress remoteAddress; + + /** The execution configuration used to instantiate the different (de-)serializers. */ + private ExecutionConfig executionConfig; + + /** + * Create the Queryable State Client. + * @param remoteHostname the hostname of the {@code Client Proxy} to connect to. + * @param remotePort the port of the proxy to connect to. + */ + public QueryableStateClient(final String remoteHostname, final int remotePort) throws UnknownHostException { + this(InetAddress.getByName(Preconditions.checkNotNull(remoteHostname)), remotePort); + } + + /** + * Create the Queryable State Client. + * @param remoteAddress the {@link InetAddress address} of the {@code Client Proxy} to connect to. + * @param remotePort the port of the proxy to connect to. + */ + public QueryableStateClient(final InetAddress remoteAddress, final int remotePort) { + Preconditions.checkArgument(remotePort >= 0 && remotePort <= 65536, + "Remote Port " + remotePort + " is out of valid port range (0-65536)."); + + this.remoteAddress = new InetSocketAddress(remoteAddress, remotePort); + + final MessageSerializer messageSerializer = + new MessageSerializer<>( + new KvStateRequest.KvStateRequestDeserializer(), + new KvStateResponse.KvStateResponseDeserializer()); + + this.client = new Client<>( + "Queryable State Client", + 1, + messageSerializer, + new DisabledKvStateRequestStats()); + } + + /** Shuts down the client. */ + public void shutdown() { + client.shutdown(); + } + + /** + * Gets the {@link ExecutionConfig}. + */ + public ExecutionConfig getExecutionConfig() { + return executionConfig; + } + + /** + * Replaces the existing {@link ExecutionConfig} (possibly {@code null}), with the provided one. + * @param config The new {@code configuration}. + * @return The old configuration, or {@code null} if none was specified. + * */ + public ExecutionConfig setExecutionConfig(ExecutionConfig config) { + ExecutionConfig prev = executionConfig; + this.executionConfig = config; + return prev; + } + + /** + * Returns a future holding the request result. * + * @param jobId JobID of the job the queryable state belongs to. + * @param queryableStateName Name under which the state is queryable. + * @param key The key we are interested in. + * @param keyTypeHint A {@link TypeHint} used to extract the type of the key. + * @param stateDescriptor The {@link StateDescriptor} of the state we want to query. + * @return Future holding the immutable {@link State} object containing the result. + */ + @PublicEvolving + public CompletableFuture getKvState( + final JobID jobId, + final String queryableStateName, + final K key, + final TypeHint keyTypeHint, + final StateDescriptor stateDescriptor) { + + Preconditions.checkNotNull(keyTypeHint); + + TypeInformation keyTypeInfo = keyTypeHint.getTypeInfo(); + return getKvState(jobId, queryableStateName, key, keyTypeInfo, stateDescriptor); + } + + /** + * Returns a future holding the request result. * + * @param jobId JobID of the job the queryable state belongs to. + * @param queryableStateName Name under which the state is queryable. + * @param key The key we are interested in. + * @param keyTypeInfo The {@link TypeInformation} of the key. + * @param stateDescriptor The {@link StateDescriptor} of the state we want to query. + * @return Future holding the immutable {@link State} object containing the result. + */ + @PublicEvolving + public CompletableFuture getKvState( + final JobID jobId, + final String queryableStateName, + final K key, + final TypeInformation keyTypeInfo, + final StateDescriptor stateDescriptor) { + + return getKvState(jobId, queryableStateName, key, VoidNamespace.INSTANCE, + keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor); + } + + /** + * Returns a future holding the request result. + * @param jobId JobID of the job the queryable state belongs to. + * @param queryableStateName Name under which the state is queryable. + * @param key The key that the state we request is associated with. + * @param namespace The namespace of the state. + * @param keyTypeInfo The {@link TypeInformation} of the keys. + * @param namespaceTypeInfo The {@link TypeInformation} of the namespace. + * @param stateDescriptor The {@link StateDescriptor} of the state we want to query. + * @return Future holding the immutable {@link State} object containing the result. + */ + @PublicEvolving + public CompletableFuture getKvState( + final JobID jobId, + final String queryableStateName, + final K key, + final N namespace, + final TypeInformation keyTypeInfo, + final TypeInformation namespaceTypeInfo, + final StateDescriptor stateDescriptor) { + + Preconditions.checkNotNull(jobId); + Preconditions.checkNotNull(queryableStateName); + Preconditions.checkNotNull(key); + Preconditions.checkNotNull(namespace); + + Preconditions.checkNotNull(keyTypeInfo); + Preconditions.checkNotNull(namespaceTypeInfo); + Preconditions.checkNotNull(stateDescriptor); + + TypeSerializer keySerializer = keyTypeInfo.createSerializer(executionConfig); + TypeSerializer namespaceSerializer = namespaceTypeInfo.createSerializer(executionConfig); + + stateDescriptor.initializeSerializerUnlessSet(executionConfig); + + final byte[] serializedKeyAndNamespace; + try { + serializedKeyAndNamespace = KvStateSerializer + .serializeKeyAndNamespace(key, keySerializer, namespace, namespaceSerializer); + } catch (IOException e) { + return FutureUtils.getFailedFuture(e); + } + + return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace).thenApply( + stateResponse -> { + try { + return stateDescriptor.bind(new ImmutableStateBinder(stateResponse.getContent())); + } catch (Exception e) { + throw new FlinkRuntimeException(e); + } + }); + } + + /** + * Returns a future holding the serialized request result. + * + * @param jobId JobID of the job the queryable state + * belongs to + * @param queryableStateName Name under which the state is queryable + * @param keyHashCode Integer hash code of the key (result of + * a call to {@link Object#hashCode()} + * @param serializedKeyAndNamespace Serialized key and namespace to query + * KvState instance with + * @return Future holding the serialized result + */ + private CompletableFuture getKvState( + final JobID jobId, + final String queryableStateName, + final int keyHashCode, + final byte[] serializedKeyAndNamespace) { + LOG.info("Sending State Request to {}.", remoteAddress); + try { + KvStateRequest request = new KvStateRequest(jobId, queryableStateName, keyHashCode, serializedKeyAndNamespace); + return client.sendRequest(remoteAddress, request); + } catch (Exception e) { + LOG.error("Unable to send KVStateRequest: ", e); + return FutureUtils.getFailedFuture(e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespace.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespace.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespace.java new file mode 100644 index 0000000..0560ec2 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespace.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client; + +import org.apache.flink.annotation.Internal; + +import java.io.ObjectStreamException; + +/** + * Singleton placeholder class for state without a namespace. + * + *

THIS WAS COPIED FROM RUNTIME SO THAT WE AVOID THE DEPENDENCY. + */ +@Internal +public final class VoidNamespace { + + // ------------------------------------------------------------------------ + // Singleton instance. + // ------------------------------------------------------------------------ + + /** The singleton instance. */ + public static final VoidNamespace INSTANCE = new VoidNamespace(); + + /** Getter for the singleton instance. */ + public static VoidNamespace get() { + return INSTANCE; + } + + /** This class should not be instantiated. */ + private VoidNamespace() {} + + // ------------------------------------------------------------------------ + // Standard Utilities + // ------------------------------------------------------------------------ + + @Override + public int hashCode() { + return 99; + } + + @Override + public boolean equals(Object obj) { + return obj == this; + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } + + // ------------------------------------------------------------------------ + // Singleton serialization + // ------------------------------------------------------------------------ + + // make sure that we preserve the singleton properly on serialization + private Object readResolve() throws ObjectStreamException { + return INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceSerializer.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceSerializer.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceSerializer.java new file mode 100644 index 0000000..38db705 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceSerializer.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; + +/** + * Serializer for {@link VoidNamespace}. + * + *

THIS WAS COPIED FROM RUNTIME SO THAT WE AVOID THE DEPENDENCY. + */ +@Internal +public final class VoidNamespaceSerializer extends TypeSerializerSingleton { + + private static final long serialVersionUID = 1L; + + public static final VoidNamespaceSerializer INSTANCE = new VoidNamespaceSerializer(); + + @Override + public boolean isImmutableType() { + return true; + } + + @Override + public VoidNamespace createInstance() { + return VoidNamespace.get(); + } + + @Override + public VoidNamespace copy(VoidNamespace from) { + return VoidNamespace.get(); + } + + @Override + public VoidNamespace copy(VoidNamespace from, VoidNamespace reuse) { + return VoidNamespace.get(); + } + + @Override + public int getLength() { + return 0; + } + + @Override + public void serialize(VoidNamespace record, DataOutputView target) throws IOException { + // Make progress in the stream, write one byte. + // + // We could just skip writing anything here, because of the way this is + // used with the state backends, but if it is ever used somewhere else + // (even though it is unlikely to happen), it would be a problem. + target.write(0); + } + + @Override + public VoidNamespace deserialize(DataInputView source) throws IOException { + source.readByte(); + return VoidNamespace.get(); + } + + @Override + public VoidNamespace deserialize(VoidNamespace reuse, DataInputView source) throws IOException { + source.readByte(); + return VoidNamespace.get(); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + target.write(source.readByte()); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof VoidNamespaceSerializer; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfo.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfo.java new file mode 100644 index 0000000..2efb87b --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfo.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +/** + * {@link TypeInformation} for {@link VoidNamespace}. + * + *

THIS WAS COPIED FROM RUNTIME SO THAT WE AVOID THE DEPENDENCY. + */ +@Internal +public class VoidNamespaceTypeInfo extends TypeInformation { + + private static final long serialVersionUID = 5453679706408610586L; + + public static final VoidNamespaceTypeInfo INSTANCE = new VoidNamespaceTypeInfo(); + + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 0; + } + + @Override + public int getTotalFields() { + return 0; + } + + @Override + public Class getTypeClass() { + return VoidNamespace.class; + } + + @Override + public boolean isKeyType() { + return false; + } + + @Override + public TypeSerializer createSerializer(ExecutionConfig config) { + return VoidNamespaceSerializer.INSTANCE; + } + + @Override + public String toString() { + return "VoidNamespaceTypeInfo"; + } + + @Override + public boolean equals(Object obj) { + return this == obj || obj instanceof VoidNamespaceTypeInfo; + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof VoidNamespaceTypeInfo; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java new file mode 100644 index 0000000..8964fbf --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.AggregatingState; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * A read-only {@link AggregatingState} that does not allow for modifications. + * + *

This is the type of the result returned when querying Flink's keyed state using the + * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and + * providing an {@link AggregatingStateDescriptor}. + */ +@PublicEvolving +public final class ImmutableAggregatingState extends ImmutableState implements AggregatingState { + + private final OUT value; + + private ImmutableAggregatingState(OUT value) { + this.value = Preconditions.checkNotNull(value); + } + + @Override + public OUT get() { + return value; + } + + @Override + public void add(Object newValue) { + throw MODIFICATION_ATTEMPT_ERROR; + } + + @Override + public void clear() { + throw MODIFICATION_ATTEMPT_ERROR; + } + + public static ImmutableAggregatingState createState( + final AggregatingStateDescriptor stateDescriptor, + final byte[] serializedValue) throws IOException { + + final ACC accumulator = KvStateSerializer.deserializeValue( + serializedValue, + stateDescriptor.getSerializer()); + + final OUT state = stateDescriptor.getAggregateFunction().getResult(accumulator); + return new ImmutableAggregatingState<>(state); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java new file mode 100644 index 0000000..25f3118 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * A read-only {@link FoldingState} that does not allow for modifications. + * + *

This is the result returned when querying Flink's keyed state using the + * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and + * providing an {@link FoldingStateDescriptor}. + */ +@PublicEvolving +@Deprecated +public final class ImmutableFoldingState extends ImmutableState implements FoldingState { + + private final ACC value; + + private ImmutableFoldingState(ACC value) { + this.value = Preconditions.checkNotNull(value); + } + + @Override + public ACC get() { + return value; + } + + @Override + public void add(Object newValue) { + throw MODIFICATION_ATTEMPT_ERROR; + } + + @Override + public void clear() { + throw MODIFICATION_ATTEMPT_ERROR; + } + + public static ImmutableFoldingState createState( + final FoldingStateDescriptor stateDescriptor, + final byte[] serializedState) throws IOException { + + final ACC state = KvStateSerializer.deserializeValue( + serializedState, + stateDescriptor.getSerializer()); + return new ImmutableFoldingState<>(state); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java new file mode 100644 index 0000000..3dcd75d --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.List; + +/** + * A read-only {@link ListState} that does not allow for modifications. + * + *

This is the result returned when querying Flink's keyed state using the + * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and + * providing an {@link ListStateDescriptor}. + */ +@PublicEvolving +public final class ImmutableListState extends ImmutableState implements ListState { + + private final List listState; + + private ImmutableListState(final List state) { + this.listState = Preconditions.checkNotNull(state); + } + + @Override + public Iterable get() { + return listState; + } + + @Override + public void add(V value) { + throw MODIFICATION_ATTEMPT_ERROR; + } + + @Override + public void clear() { + throw MODIFICATION_ATTEMPT_ERROR; + } + + public static ImmutableListState createState( + final ListStateDescriptor stateDescriptor, + final byte[] serializedState) throws IOException { + + final List state = KvStateSerializer.deserializeList( + serializedState, + stateDescriptor.getElementSerializer()); + return new ImmutableListState<>(state); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java new file mode 100644 index 0000000..bb08cf0 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +/** + * A read-only {@link MapState} that does not allow for modifications. + * + *

This is the result returned when querying Flink's keyed state using the + * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and + * providing an {@link MapStateDescriptor}. + */ +@PublicEvolving +public final class ImmutableMapState extends ImmutableState implements MapState { + + private final Map state; + + private ImmutableMapState(final Map mapState) { + this.state = Preconditions.checkNotNull(mapState); + } + + @Override + public V get(K key) { + return state.get(key); + } + + @Override + public void put(K key, V value) { + throw MODIFICATION_ATTEMPT_ERROR; + } + + @Override + public void putAll(Map map) { + throw MODIFICATION_ATTEMPT_ERROR; + } + + @Override + public void remove(K key) { + throw MODIFICATION_ATTEMPT_ERROR; + } + + @Override + public boolean contains(K key) { + return state.containsKey(key); + } + + /** + * Returns all the mappings in the state in a {@link Collections#unmodifiableSet(Set)}. + * + * @return A read-only iterable view of all the key-value pairs in the state. + * + * @throws Exception Thrown if the system cannot access the state. + */ + @Override + public Iterable> entries() { + return Collections.unmodifiableSet(state.entrySet()); + } + + /** + * Returns all the keys in the state in a {@link Collections#unmodifiableSet(Set)}. + * + * @return A read-only iterable view of all the keys in the state. + * + * @throws Exception Thrown if the system cannot access the state. + */ + @Override + public Iterable keys() { + return Collections.unmodifiableSet(state.keySet()); + } + + /** + * Returns all the values in the state in a {@link Collections#unmodifiableCollection(Collection)}. + * + * @return A read-only iterable view of all the values in the state. + * + * @throws Exception Thrown if the system cannot access the state. + */ + @Override + public Iterable values() { + return Collections.unmodifiableCollection(state.values()); + } + + /** + * Iterates over all the mappings in the state. The iterator cannot + * remove elements. + * + * @return A read-only iterator over all the mappings in the state + * + * @throws Exception Thrown if the system cannot access the state. + */ + @Override + public Iterator> iterator() { + return Collections.unmodifiableSet(state.entrySet()).iterator(); + } + + @Override + public void clear() { + throw MODIFICATION_ATTEMPT_ERROR; + } + + public static ImmutableMapState createState( + final MapStateDescriptor stateDescriptor, + final byte[] serializedState) throws IOException { + + final Map state = KvStateSerializer.deserializeMap( + serializedState, + stateDescriptor.getKeySerializer(), + stateDescriptor.getValueSerializer()); + return new ImmutableMapState<>(state); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java new file mode 100644 index 0000000..46b477f --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * A read-only {@link ReducingState} that does not allow for modifications. + * + *

This is the result returned when querying Flink's keyed state using the + * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and + * providing an {@link ReducingStateDescriptor}. + */ +@PublicEvolving +public final class ImmutableReducingState extends ImmutableState implements ReducingState { + + private final V value; + + private ImmutableReducingState(V value) { + this.value = Preconditions.checkNotNull(value); + } + + @Override + public V get() { + return value; + } + + @Override + public void add(V newValue) { + throw MODIFICATION_ATTEMPT_ERROR; + } + + @Override + public void clear() { + throw MODIFICATION_ATTEMPT_ERROR; + } + + public static ImmutableReducingState createState( + final ReducingStateDescriptor stateDescriptor, + final byte[] serializedState) throws IOException { + + final V state = KvStateSerializer.deserializeValue( + serializedState, + stateDescriptor.getSerializer()); + return new ImmutableReducingState<>(state); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java new file mode 100644 index 0000000..863f07b --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +/** + * A base class for the read-only types of state returned + * as results from the Queryable State Client. + */ +abstract class ImmutableState { + + protected static final UnsupportedOperationException MODIFICATION_ATTEMPT_ERROR = + new UnsupportedOperationException("State is read-only. No modifications allowed."); +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java new file mode 100644 index 0000000..6ce2787 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +import org.apache.flink.api.common.state.AggregatingState; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.StateBinder; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.util.Preconditions; + +/** + * A {@link StateBinder} used to deserialize the results returned by the + * {@link org.apache.flink.queryablestate.client.QueryableStateClient}. + * + *

The result is an immutable {@link org.apache.flink.api.common.state.State State} + * object containing the requested result. + */ +public class ImmutableStateBinder implements StateBinder { + + private final byte[] serializedState; + + public ImmutableStateBinder(final byte[] content) { + serializedState = Preconditions.checkNotNull(content); + } + + @Override + public ValueState createValueState(ValueStateDescriptor stateDesc) throws Exception { + return ImmutableValueState.createState(stateDesc, serializedState); + } + + @Override + public ListState createListState(ListStateDescriptor stateDesc) throws Exception { + return ImmutableListState.createState(stateDesc, serializedState); + } + + @Override + public ReducingState createReducingState(ReducingStateDescriptor stateDesc) throws Exception { + return ImmutableReducingState.createState(stateDesc, serializedState); + } + + @Override + public AggregatingState createAggregatingState(AggregatingStateDescriptor stateDesc) throws Exception { + return ImmutableAggregatingState.createState(stateDesc, serializedState); + } + + @Override + public FoldingState createFoldingState(FoldingStateDescriptor stateDesc) throws Exception { + return ImmutableFoldingState.createState(stateDesc, serializedState); + } + + @Override + public MapState createMapState(MapStateDescriptor stateDesc) throws Exception { + return ImmutableMapState.createState(stateDesc, serializedState); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java new file mode 100644 index 0000000..f3ddd2b --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * A read-only {@link ValueState} that does not allow for modifications. + * + *

This is the result returned when querying Flink's keyed state using the + * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and + * providing an {@link ValueStateDescriptor}. + */ +@PublicEvolving +public final class ImmutableValueState extends ImmutableState implements ValueState { + + private final V value; + + private ImmutableValueState(V value) { + this.value = Preconditions.checkNotNull(value); + } + + @Override + public V value() { + return value; + } + + @Override + public void update(V newValue) { + throw MODIFICATION_ATTEMPT_ERROR; + } + + @Override + public void clear() { + throw MODIFICATION_ATTEMPT_ERROR; + } + + public static ImmutableValueState createState( + final ValueStateDescriptor stateDescriptor, + final byte[] serializedState) throws IOException { + + final V state = KvStateSerializer.deserializeValue( + serializedState, + stateDescriptor.getSerializer()); + return new ImmutableValueState<>(state); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataInputDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataInputDeserializer.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataInputDeserializer.java new file mode 100644 index 0000000..878df85 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataInputDeserializer.java @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state.serialization; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemoryUtils; + +import java.io.EOFException; +import java.io.IOException; +import java.io.UTFDataFormatException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/** + * A simple and efficient deserializer for the {@link java.io.DataInput} interface. + * + *

THIS WAS COPIED FROM RUNTIME SO THAT WE AVOID THE DEPENDENCY. + */ +public class DataInputDeserializer implements DataInputView, java.io.Serializable { + + private static final long serialVersionUID = 1L; + + // ------------------------------------------------------------------------ + + private byte[] buffer; + + private int end; + + private int position; + + // ------------------------------------------------------------------------ + + public DataInputDeserializer() {} + + public DataInputDeserializer(byte[] buffer) { + setBuffer(buffer, 0, buffer.length); + } + + public DataInputDeserializer(byte[] buffer, int start, int len) { + setBuffer(buffer, start, len); + } + + public DataInputDeserializer(ByteBuffer buffer) { + setBuffer(buffer); + } + + // ------------------------------------------------------------------------ + // Changing buffers + // ------------------------------------------------------------------------ + + public void setBuffer(ByteBuffer buffer) { + if (buffer.hasArray()) { + this.buffer = buffer.array(); + this.position = buffer.arrayOffset() + buffer.position(); + this.end = this.position + buffer.remaining(); + } else if (buffer.isDirect()) { + this.buffer = new byte[buffer.remaining()]; + this.position = 0; + this.end = this.buffer.length; + + buffer.get(this.buffer); + } else { + throw new IllegalArgumentException("The given buffer is neither an array-backed heap ByteBuffer, nor a direct ByteBuffer."); + } + } + + public void setBuffer(byte[] buffer, int start, int len) { + if (buffer == null) { + throw new NullPointerException(); + } + + if (start < 0 || len < 0 || start + len > buffer.length) { + throw new IllegalArgumentException(); + } + + this.buffer = buffer; + this.position = start; + this.end = start + len; + } + + public void releaseArrays() { + this.buffer = null; + } + + // ---------------------------------------------------------------------------------------- + // Data Input + // ---------------------------------------------------------------------------------------- + + public int available() { + if (position < end) { + return end - position; + } else { + return 0; + } + } + + @Override + public boolean readBoolean() throws IOException { + if (this.position < this.end) { + return this.buffer[this.position++] != 0; + } else { + throw new EOFException(); + } + } + + @Override + public byte readByte() throws IOException { + if (this.position < this.end) { + return this.buffer[this.position++]; + } else { + throw new EOFException(); + } + } + + @Override + public char readChar() throws IOException { + if (this.position < this.end - 1) { + return (char) (((this.buffer[this.position++] & 0xff) << 8) | (this.buffer[this.position++] & 0xff)); + } else { + throw new EOFException(); + } + } + + @Override + public double readDouble() throws IOException { + return Double.longBitsToDouble(readLong()); + } + + @Override + public float readFloat() throws IOException { + return Float.intBitsToFloat(readInt()); + } + + @Override + public void readFully(byte[] b) throws IOException { + readFully(b, 0, b.length); + } + + @Override + public void readFully(byte[] b, int off, int len) throws IOException { + if (len >= 0) { + if (off <= b.length - len) { + if (this.position <= this.end - len) { + System.arraycopy(this.buffer, position, b, off, len); + position += len; + } else { + throw new EOFException(); + } + } else { + throw new ArrayIndexOutOfBoundsException(); + } + } else if (len < 0) { + throw new IllegalArgumentException("Length may not be negative."); + } + } + + @Override + public int readInt() throws IOException { + if (this.position >= 0 && this.position < this.end - 3) { + @SuppressWarnings("restriction") + int value = UNSAFE.getInt(this.buffer, BASE_OFFSET + this.position); + if (LITTLE_ENDIAN) { + value = Integer.reverseBytes(value); + } + + this.position += 4; + return value; + } else { + throw new EOFException(); + } + } + + @Override + public String readLine() throws IOException { + if (this.position < this.end) { + // read until a newline is found + StringBuilder bld = new StringBuilder(); + char curr = (char) readUnsignedByte(); + while (position < this.end && curr != '\n') { + bld.append(curr); + curr = (char) readUnsignedByte(); + } + // trim a trailing carriage return + int len = bld.length(); + if (len > 0 && bld.charAt(len - 1) == '\r') { + bld.setLength(len - 1); + } + String s = bld.toString(); + bld.setLength(0); + return s; + } else { + return null; + } + } + + @Override + public long readLong() throws IOException { + if (position >= 0 && position < this.end - 7) { + @SuppressWarnings("restriction") + long value = UNSAFE.getLong(this.buffer, BASE_OFFSET + this.position); + if (LITTLE_ENDIAN) { + value = Long.reverseBytes(value); + } + this.position += 8; + return value; + } else { + throw new EOFException(); + } + } + + @Override + public short readShort() throws IOException { + if (position >= 0 && position < this.end - 1) { + return (short) ((((this.buffer[position++]) & 0xff) << 8) | ((this.buffer[position++]) & 0xff)); + } else { + throw new EOFException(); + } + } + + @Override + public String readUTF() throws IOException { + int utflen = readUnsignedShort(); + byte[] bytearr = new byte[utflen]; + char[] chararr = new char[utflen]; + + int c, char2, char3; + int count = 0; + int chararrCount = 0; + + readFully(bytearr, 0, utflen); + + while (count < utflen) { + c = (int) bytearr[count] & 0xff; + if (c > 127) { + break; + } + count++; + chararr[chararrCount++] = (char) c; + } + + while (count < utflen) { + c = (int) bytearr[count] & 0xff; + switch (c >> 4) { + case 0: + case 1: + case 2: + case 3: + case 4: + case 5: + case 6: + case 7: + /* 0xxxxxxx */ + count++; + chararr[chararrCount++] = (char) c; + break; + case 12: + case 13: + /* 110x xxxx 10xx xxxx */ + count += 2; + if (count > utflen) { + throw new UTFDataFormatException("malformed input: partial character at end"); + } + char2 = (int) bytearr[count - 1]; + if ((char2 & 0xC0) != 0x80) { + throw new UTFDataFormatException("malformed input around byte " + count); + } + chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F)); + break; + case 14: + /* 1110 xxxx 10xx xxxx 10xx xxxx */ + count += 3; + if (count > utflen) { + throw new UTFDataFormatException("malformed input: partial character at end"); + } + char2 = (int) bytearr[count - 2]; + char3 = (int) bytearr[count - 1]; + if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) { + throw new UTFDataFormatException("malformed input around byte " + (count - 1)); + } + chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F)); + break; + default: + /* 10xx xxxx, 1111 xxxx */ + throw new UTFDataFormatException("malformed input around byte " + count); + } + } + // The number of chars produced may be less than utflen + return new String(chararr, 0, chararrCount); + } + + @Override + public int readUnsignedByte() throws IOException { + if (this.position < this.end) { + return (this.buffer[this.position++] & 0xff); + } else { + throw new EOFException(); + } + } + + @Override + public int readUnsignedShort() throws IOException { + if (this.position < this.end - 1) { + return ((this.buffer[this.position++] & 0xff) << 8) | (this.buffer[this.position++] & 0xff); + } else { + throw new EOFException(); + } + } + + @Override + public int skipBytes(int n) throws IOException { + if (this.position <= this.end - n) { + this.position += n; + return n; + } else { + n = this.end - this.position; + this.position = this.end; + return n; + } + } + + @Override + public void skipBytesToRead(int numBytes) throws IOException { + int skippedBytes = skipBytes(numBytes); + + if (skippedBytes < numBytes){ + throw new EOFException("Could not skip " + numBytes + " bytes."); + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (b == null){ + throw new NullPointerException("Byte array b cannot be null."); + } + + if (off < 0){ + throw new IndexOutOfBoundsException("Offset cannot be negative."); + } + + if (len < 0){ + throw new IndexOutOfBoundsException("Length cannot be negative."); + } + + if (b.length - off < len){ + throw new IndexOutOfBoundsException("Byte array does not provide enough space to store requested data" + + "."); + } + + if (this.position >= this.end) { + return -1; + } else { + int toRead = Math.min(this.end - this.position, len); + System.arraycopy(this.buffer, this.position, b, off, toRead); + this.position += toRead; + + return toRead; + } + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + @SuppressWarnings("restriction") + private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; + + @SuppressWarnings("restriction") + private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); + + private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN); +}