Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D8FAE200C45 for ; Tue, 28 Mar 2017 20:05:54 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D73D7160B6B; Tue, 28 Mar 2017 18:05:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AA185160B89 for ; Tue, 28 Mar 2017 20:05:52 +0200 (CEST) Received: (qmail 94969 invoked by uid 500); 28 Mar 2017 18:05:51 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 94881 invoked by uid 99); 28 Mar 2017 18:05:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Mar 2017 18:05:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 72DA3DFF47; Tue, 28 Mar 2017 18:05:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: srichter@apache.org To: commits@flink.apache.org Date: Tue, 28 Mar 2017 18:05:52 -0000 Message-Id: In-Reply-To: <0ba951d258bc40d9aa099a8cc854450a@git.apache.org> References: <0ba951d258bc40d9aa099a8cc854450a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] flink git commit: [FLINK-6034] [checkpoints] Introduce KeyedStateHandle abstraction for the snapshots in keyed streams archived-at: Tue, 28 Mar 2017 18:05:55 -0000 [FLINK-6034] [checkpoints] Introduce KeyedStateHandle abstraction for the snapshots in keyed streams Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cd552741 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cd552741 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cd552741 Branch: refs/heads/master Commit: cd5527417a1cae57073a8855c6c3b88c88c780aa Parents: 89866a5 Author: xiaogang.sxg Authored: Thu Mar 23 23:32:15 2017 +0800 Committer: Stefan Richter Committed: Tue Mar 28 20:05:28 2017 +0200 ---------------------------------------------------------------------- .../state/RocksDBKeyedStateBackend.java | 46 ++++++++++---- .../state/RocksDBAsyncSnapshotTest.java | 3 +- .../state/RocksDBStateBackendTest.java | 21 ++++--- .../cep/operator/CEPMigration12to13Test.java | 14 ++--- .../apache/flink/migration/MigrationUtil.java | 10 +-- .../checkpoint/StateAssignmentOperation.java | 41 ++++++------ .../flink/runtime/checkpoint/SubtaskState.java | 14 ++--- .../savepoint/SavepointV1Serializer.java | 42 +++++++------ .../state/AbstractKeyedStateBackend.java | 2 +- .../runtime/state/KeyGroupsStateHandle.java | 39 ++++-------- .../flink/runtime/state/KeyedStateHandle.java | 40 ++++++++++++ .../state/StateInitializationContextImpl.java | 28 ++++++++- .../StateSnapshotContextSynchronousImpl.java | 12 ++-- .../flink/runtime/state/TaskStateHandles.java | 16 ++--- .../state/heap/HeapKeyedStateBackend.java | 46 ++++++++++---- .../checkpoint/CheckpointCoordinatorTest.java | 29 +++++---- .../checkpoint/CheckpointStateRestoreTest.java | 3 +- .../savepoint/MigrationV0ToV1Test.java | 14 ++++- .../KeyedStateCheckpointOutputStreamTest.java | 4 +- .../runtime/state/StateBackendTestBase.java | 66 ++++++++++---------- ...pKeyedStateBackendSnapshotMigrationTest.java | 3 +- .../api/operators/AbstractStreamOperator.java | 7 ++- .../api/operators/OperatorSnapshotResult.java | 18 +++--- .../runtime/tasks/OperatorStateHandles.java | 14 ++--- .../streaming/runtime/tasks/StreamTask.java | 14 ++--- .../operators/AbstractStreamOperatorTest.java | 10 +-- .../operators/OperatorSnapshotResultTest.java | 10 +-- .../StateInitializationContextImplTest.java | 9 +-- .../tasks/InterruptSensitiveRestoreTest.java | 17 ++--- .../streaming/runtime/tasks/StreamTaskTest.java | 14 ++--- .../util/AbstractStreamOperatorTestHarness.java | 25 ++++---- .../KeyedOneInputStreamOperatorTestHarness.java | 17 ++--- .../KeyedTwoInputStreamOperatorTestHarness.java | 3 +- 33 files changed, 389 insertions(+), 262 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 2ce527f..0407070 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -40,6 +40,7 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.migration.MigrationNamespaceSerializerProxy; import org.apache.flink.migration.MigrationUtil; import org.apache.flink.migration.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.migration.state.MigrationKeyGroupStateHandle; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable; import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback; @@ -52,6 +53,7 @@ import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.internal.InternalAggregatingState; @@ -257,7 +259,7 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { * @throws Exception */ @Override - public RunnableFuture snapshot( + public RunnableFuture snapshot( final long checkpointId, final long timestamp, final CheckpointStreamFactory streamFactory, @@ -286,8 +288,8 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { } // implementation of the async IO operation, based on FutureTask - AbstractAsyncIOCallable ioCallable = - new AbstractAsyncIOCallable() { + AbstractAsyncIOCallable ioCallable = + new AbstractAsyncIOCallable() { @Override public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception { @@ -620,7 +622,7 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { } @Override - public void restore(Collection restoreState) throws Exception { + public void restore(Collection restoreState) throws Exception { LOG.info("Initializing RocksDB keyed state backend from snapshot."); if (LOG.isDebugEnabled()) { @@ -669,17 +671,23 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { /** * Restores all key-groups data that is referenced by the passed state handles. * - * @param keyGroupsStateHandles List of all key groups state handles that shall be restored. + * @param keyedStateHandles List of all key groups state handles that shall be restored. * @throws IOException * @throws ClassNotFoundException * @throws RocksDBException */ - public void doRestore(Collection keyGroupsStateHandles) + public void doRestore(Collection keyedStateHandles) throws IOException, ClassNotFoundException, RocksDBException { - for (KeyGroupsStateHandle keyGroupsStateHandle : keyGroupsStateHandles) { - if (keyGroupsStateHandle != null) { - this.currentKeyGroupsStateHandle = keyGroupsStateHandle; + for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { + if (keyedStateHandle != null) { + + if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) { + throw new IllegalStateException("Unexpected state handle type, " + + "expected: " + KeyGroupsStateHandle.class + + ", but found: " + keyedStateHandle.getClass()); + } + this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle; restoreKeyGroupsInStateHandle(); } } @@ -761,6 +769,12 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { private void restoreKVStateData() throws IOException, RocksDBException { //for all key-groups in the current state handle... for (Tuple2 keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) { + int keyGroup = keyGroupOffset.f0; + + // Check that restored key groups all belong to the backend + Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup), + "The key group must belong to the backend"); + long offset = keyGroupOffset.f1; //not empty key-group? if (0L != offset) { @@ -1143,15 +1157,25 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { * For backwards compatibility, remove again later! */ @Deprecated - private void restoreOldSavepointKeyedState(Collection restoreState) throws Exception { + private void restoreOldSavepointKeyedState(Collection restoreState) throws Exception { if (restoreState.isEmpty()) { return; } Preconditions.checkState(1 == restoreState.size(), "Only one element expected here."); + + KeyedStateHandle keyedStateHandle = restoreState.iterator().next(); + if (!(keyedStateHandle instanceof MigrationKeyGroupStateHandle)) { + throw new IllegalStateException("Unexpected state handle type, " + + "expected: " + MigrationKeyGroupStateHandle.class + + ", but found: " + keyedStateHandle.getClass()); + } + + MigrationKeyGroupStateHandle keyGroupStateHandle = (MigrationKeyGroupStateHandle) keyedStateHandle; + HashMap namedStates; - try (FSDataInputStream inputStream = restoreState.iterator().next().openInputStream()) { + try (FSDataInputStream inputStream = keyGroupStateHandle.openInputStream()) { namedStates = InstantiationUtil.deserializeObject(inputStream, userCodeClassLoader); } http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java index 90de7a6..ffe2ce2 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java @@ -42,6 +42,7 @@ import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; @@ -343,7 +344,7 @@ public class RocksDBAsyncSnapshotTest { StringSerializer.INSTANCE, new ValueStateDescriptor<>("foobar", String.class)); - RunnableFuture snapshotFuture = keyedStateBackend.snapshot( + RunnableFuture snapshotFuture = keyedStateBackend.snapshot( checkpointId, timestamp, checkpointStreamFactory, CheckpointOptions.forFullCheckpoint()); try { http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index 708613b..d95a9b4 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.StateBackendTestBase; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; @@ -172,7 +173,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, + RunnableFuture snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); RocksDB spyDB = keyedStateBackend.db; @@ -210,7 +211,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, + RunnableFuture snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); RocksDB spyDB = keyedStateBackend.db; @@ -239,7 +240,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); + RunnableFuture snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); snapshot.cancel(true); verifyRocksObjectsReleased(); } @@ -247,7 +248,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); + RunnableFuture snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); snapshot.cancel(true); Thread asyncSnapshotThread = new Thread(snapshot); asyncSnapshotThread.start(); @@ -264,7 +265,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); + RunnableFuture snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); Thread asyncSnapshotThread = new Thread(snapshot); asyncSnapshotThread.start(); waiter.await(); // wait for snapshot to run @@ -272,10 +273,10 @@ public class RocksDBStateBackendTest extends StateBackendTestBase 0); - assertEquals(2, keyGroupsStateHandle.getNumberOfKeyGroups()); + KeyedStateHandle keyedStateHandle = snapshot.get(); + assertNotNull(keyedStateHandle); + assertTrue(keyedStateHandle.getStateSize() > 0); + assertEquals(2, keyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups()); assertTrue(testStreamFactory.getLastCreatedStream().isClosed()); asyncSnapshotThread.join(); verifyRocksObjectsReleased(); @@ -284,7 +285,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); + RunnableFuture snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); Thread asyncSnapshotThread = new Thread(snapshot); asyncSnapshotThread.start(); waiter.await(); // wait for snapshot to run http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java index f230bbc..dbe4230 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java @@ -26,7 +26,7 @@ import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.conditions.SimpleCondition; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.streaming.api.watermark.Watermark; @@ -128,8 +128,8 @@ public class CEPMigration12to13Test { final OperatorStateHandles snapshot = new OperatorStateHandles( (int) ois.readObject(), (StreamStateHandle) ois.readObject(), - (Collection) ois.readObject(), - (Collection) ois.readObject(), + (Collection) ois.readObject(), + (Collection) ois.readObject(), (Collection) ois.readObject(), (Collection) ois.readObject() ); @@ -243,8 +243,8 @@ public class CEPMigration12to13Test { final OperatorStateHandles snapshot = new OperatorStateHandles( (int) ois.readObject(), (StreamStateHandle) ois.readObject(), - (Collection) ois.readObject(), - (Collection) ois.readObject(), + (Collection) ois.readObject(), + (Collection) ois.readObject(), (Collection) ois.readObject(), (Collection) ois.readObject() ); @@ -363,8 +363,8 @@ public class CEPMigration12to13Test { final OperatorStateHandles snapshot = new OperatorStateHandles( (int) ois.readObject(), (StreamStateHandle) ois.readObject(), - (Collection) ois.readObject(), - (Collection) ois.readObject(), + (Collection) ois.readObject(), + (Collection) ois.readObject(), (Collection) ois.readObject(), (Collection) ois.readObject() ); http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java index 9427f72..a4e3a2e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java @@ -19,17 +19,17 @@ package org.apache.flink.migration; import org.apache.flink.migration.state.MigrationKeyGroupStateHandle; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import java.util.Collection; public class MigrationUtil { @SuppressWarnings("deprecation") - public static boolean isOldSavepointKeyedState(Collection keyGroupsStateHandles) { - return (keyGroupsStateHandles != null) - && (keyGroupsStateHandles.size() == 1) - && (keyGroupsStateHandles.iterator().next() instanceof MigrationKeyGroupStateHandle); + public static boolean isOldSavepointKeyedState(Collection keyedStateHandles) { + return (keyedStateHandles != null) + && (keyedStateHandles.size() == 1) + && (keyedStateHandles.iterator().next() instanceof MigrationKeyGroupStateHandle); } } http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java index 3fda430..ac70e1a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateHandles; @@ -160,8 +161,8 @@ public class StateAssignmentOperation { @SuppressWarnings("unchecked") List[] parallelOpStatesStream = new List[chainLength]; - List parallelKeyedStatesBackend = new ArrayList<>(oldParallelism); - List parallelKeyedStateStream = new ArrayList<>(oldParallelism); + List parallelKeyedStatesBackend = new ArrayList<>(oldParallelism); + List parallelKeyedStateStream = new ArrayList<>(oldParallelism); for (int p = 0; p < oldParallelism; ++p) { SubtaskState subtaskState = taskState.getState(p); @@ -173,12 +174,12 @@ public class StateAssignmentOperation { collectParallelStatesByChainOperator( parallelOpStatesStream, subtaskState.getRawOperatorState()); - KeyGroupsStateHandle keyedStateBackend = subtaskState.getManagedKeyedState(); + KeyedStateHandle keyedStateBackend = subtaskState.getManagedKeyedState(); if (null != keyedStateBackend) { parallelKeyedStatesBackend.add(keyedStateBackend); } - KeyGroupsStateHandle keyedStateStream = subtaskState.getRawKeyedState(); + KeyedStateHandle keyedStateStream = subtaskState.getRawKeyedState(); if (null != keyedStateStream) { parallelKeyedStateStream.add(keyedStateStream); } @@ -252,13 +253,13 @@ public class StateAssignmentOperation { .getTaskVertices()[subTaskIdx] .getCurrentExecutionAttempt(); - List newKeyedStatesBackend; - List newKeyedStateStream; + List newKeyedStatesBackend; + List newKeyedStateStream; if (oldParallelism == newParallelism) { SubtaskState subtaskState = taskState.getState(subTaskIdx); if (subtaskState != null) { - KeyGroupsStateHandle oldKeyedStatesBackend = subtaskState.getManagedKeyedState(); - KeyGroupsStateHandle oldKeyedStatesStream = subtaskState.getRawKeyedState(); + KeyedStateHandle oldKeyedStatesBackend = subtaskState.getManagedKeyedState(); + KeyedStateHandle oldKeyedStatesStream = subtaskState.getRawKeyedState(); newKeyedStatesBackend = oldKeyedStatesBackend != null ? Collections.singletonList( oldKeyedStatesBackend) : null; newKeyedStateStream = oldKeyedStatesStream != null ? Collections.singletonList( @@ -269,8 +270,8 @@ public class StateAssignmentOperation { } } else { KeyGroupRange subtaskKeyGroupIds = keyGroupPartitions.get(subTaskIdx); - newKeyedStatesBackend = getKeyGroupsStateHandles(parallelKeyedStatesBackend, subtaskKeyGroupIds); - newKeyedStateStream = getKeyGroupsStateHandles(parallelKeyedStateStream, subtaskKeyGroupIds); + newKeyedStatesBackend = getKeyedStateHandles(parallelKeyedStatesBackend, subtaskKeyGroupIds); + newKeyedStateStream = getKeyedStateHandles(parallelKeyedStateStream, subtaskKeyGroupIds); } TaskStateHandles taskStateHandles = new TaskStateHandles( @@ -290,19 +291,21 @@ public class StateAssignmentOperation { *

*

This is publicly visible to be used in tests. */ - public static List getKeyGroupsStateHandles( - Collection allKeyGroupsHandles, - KeyGroupRange subtaskKeyGroupIds) { + public static List getKeyedStateHandles( + Collection keyedStateHandles, + KeyGroupRange subtaskKeyGroupRange) { - List subtaskKeyGroupStates = new ArrayList<>(); + List subtaskKeyedStateHandles = new ArrayList<>(); - for (KeyGroupsStateHandle storedKeyGroup : allKeyGroupsHandles) { - KeyGroupsStateHandle intersection = storedKeyGroup.getKeyGroupIntersection(subtaskKeyGroupIds); - if (intersection.getNumberOfKeyGroups() > 0) { - subtaskKeyGroupStates.add(intersection); + for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { + KeyedStateHandle intersectedKeyedStateHandle = keyedStateHandle.getIntersection(subtaskKeyGroupRange); + + if (intersectedKeyedStateHandle != null) { + subtaskKeyedStateHandles.add(intersectedKeyedStateHandle); } } - return subtaskKeyGroupStates; + + return subtaskKeyedStateHandles; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java index 1393e32..9e195b1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.state.ChainedStateHandle; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateUtil; @@ -56,12 +56,12 @@ public class SubtaskState implements StateObject { /** * Snapshot from {@link org.apache.flink.runtime.state.KeyedStateBackend}. */ - private final KeyGroupsStateHandle managedKeyedState; + private final KeyedStateHandle managedKeyedState; /** * Snapshot written using {@link org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream}. */ - private final KeyGroupsStateHandle rawKeyedState; + private final KeyedStateHandle rawKeyedState; /** * The state size. This is also part of the deserialized state handle. @@ -74,8 +74,8 @@ public class SubtaskState implements StateObject { ChainedStateHandle legacyOperatorState, ChainedStateHandle managedOperatorState, ChainedStateHandle rawOperatorState, - KeyGroupsStateHandle managedKeyedState, - KeyGroupsStateHandle rawKeyedState) { + KeyedStateHandle managedKeyedState, + KeyedStateHandle rawKeyedState) { this.legacyOperatorState = checkNotNull(legacyOperatorState, "State"); this.managedOperatorState = managedOperatorState; @@ -114,11 +114,11 @@ public class SubtaskState implements StateObject { return rawOperatorState; } - public KeyGroupsStateHandle getManagedKeyedState() { + public KeyedStateHandle getManagedKeyedState() { return managedKeyedState; } - public KeyGroupsStateHandle getRawKeyedState() { + public KeyedStateHandle getRawKeyedState() { return rawKeyedState; } http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java index ba1949a..44461d8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileStateHandle; @@ -155,11 +156,11 @@ class SavepointV1Serializer implements SavepointSerializer { serializeOperatorStateHandle(stateHandle, dos); } - KeyGroupsStateHandle keyedStateBackend = subtaskState.getManagedKeyedState(); - serializeKeyGroupStateHandle(keyedStateBackend, dos); + KeyedStateHandle keyedStateBackend = subtaskState.getManagedKeyedState(); + serializeKeyedStateHandle(keyedStateBackend, dos); - KeyGroupsStateHandle keyedStateStream = subtaskState.getRawKeyedState(); - serializeKeyGroupStateHandle(keyedStateStream, dos); + KeyedStateHandle keyedStateStream = subtaskState.getRawKeyedState(); + serializeKeyedStateHandle(keyedStateStream, dos); } private static SubtaskState deserializeSubtaskState(DataInputStream dis) throws IOException { @@ -188,9 +189,9 @@ class SavepointV1Serializer implements SavepointSerializer { operatorStateStream.add(streamStateHandle); } - KeyGroupsStateHandle keyedStateBackend = deserializeKeyGroupStateHandle(dis); + KeyedStateHandle keyedStateBackend = deserializeKeyedStateHandle(dis); - KeyGroupsStateHandle keyedStateStream = deserializeKeyGroupStateHandle(dis); + KeyedStateHandle keyedStateStream = deserializeKeyedStateHandle(dis); ChainedStateHandle nonPartitionableStateChain = new ChainedStateHandle<>(nonPartitionableState); @@ -209,23 +210,27 @@ class SavepointV1Serializer implements SavepointSerializer { keyedStateStream); } - private static void serializeKeyGroupStateHandle( - KeyGroupsStateHandle stateHandle, DataOutputStream dos) throws IOException { + private static void serializeKeyedStateHandle( + KeyedStateHandle stateHandle, DataOutputStream dos) throws IOException { + + if (stateHandle == null) { + dos.writeByte(NULL_HANDLE); + } else if (stateHandle instanceof KeyGroupsStateHandle) { + KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) stateHandle; - if (stateHandle != null) { dos.writeByte(KEY_GROUPS_HANDLE); - dos.writeInt(stateHandle.getGroupRangeOffsets().getKeyGroupRange().getStartKeyGroup()); - dos.writeInt(stateHandle.getNumberOfKeyGroups()); - for (int keyGroup : stateHandle.keyGroups()) { - dos.writeLong(stateHandle.getOffsetForKeyGroup(keyGroup)); + dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getStartKeyGroup()); + dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getNumberOfKeyGroups()); + for (int keyGroup : keyGroupsStateHandle.getKeyGroupRange()) { + dos.writeLong(keyGroupsStateHandle.getOffsetForKeyGroup(keyGroup)); } - serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos); + serializeStreamStateHandle(keyGroupsStateHandle.getDelegateStateHandle(), dos); } else { - dos.writeByte(NULL_HANDLE); + throw new IllegalStateException("Unknown KeyedStateHandle type: " + stateHandle.getClass()); } } - private static KeyGroupsStateHandle deserializeKeyGroupStateHandle(DataInputStream dis) throws IOException { + private static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis) throws IOException { final int type = dis.readByte(); if (NULL_HANDLE == type) { return null; @@ -237,11 +242,12 @@ class SavepointV1Serializer implements SavepointSerializer { for (int i = 0; i < numKeyGroups; ++i) { offsets[i] = dis.readLong(); } - KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupRange, offsets); + KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets( + keyGroupRange, offsets); StreamStateHandle stateHandle = deserializeStreamStateHandle(dis); return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle); } else { - throw new IllegalStateException("Reading invalid KeyGroupsStateHandle, type: " + type); + throw new IllegalStateException("Reading invalid KeyedStateHandle, type: " + type); } } http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java index e6e7b23..e86f1f8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -61,7 +61,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * @param Type of the key by which state is keyed. */ public abstract class AbstractKeyedStateBackend - implements KeyedStateBackend, Snapshotable, Closeable { + implements KeyedStateBackend, Snapshotable, Closeable { /** {@link TypeSerializer} for our key. */ protected final TypeSerializer keySerializer; http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java index b454e42..bad7fd4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java @@ -29,7 +29,7 @@ import java.io.IOException; * consists of a range of key group snapshots. A key group is subset of the available * key space. The key groups are identified by their key group indices. */ -public class KeyGroupsStateHandle implements StreamStateHandle { +public class KeyGroupsStateHandle implements StreamStateHandle, KeyedStateHandle { private static final long serialVersionUID = -8070326169926626355L; @@ -54,20 +54,18 @@ public class KeyGroupsStateHandle implements StreamStateHandle { /** * - * @return iterable over the key-group range for the key-group state referenced by this handle + * @return the internal key-group range to offsets metadata */ - public Iterable keyGroups() { - return groupRangeOffsets.getKeyGroupRange(); + public KeyGroupRangeOffsets getGroupRangeOffsets() { + return groupRangeOffsets; } - /** * - * @param keyGroupId the id of a key-group - * @return true if the provided key-group id is contained in the key-group range of this handle + * @return The handle to the actual states */ - public boolean containsKeyGroup(int keyGroupId) { - return groupRangeOffsets.getKeyGroupRange().contains(keyGroupId); + public StreamStateHandle getDelegateStateHandle() { + return stateHandle; } /** @@ -85,24 +83,13 @@ public class KeyGroupsStateHandle implements StreamStateHandle { * @return key-group state over a range that is the intersection between this handle's key-group range and the * provided key-group range. */ - public KeyGroupsStateHandle getKeyGroupIntersection(KeyGroupRange keyGroupRange) { + public KeyGroupsStateHandle getIntersection(KeyGroupRange keyGroupRange) { return new KeyGroupsStateHandle(groupRangeOffsets.getIntersection(keyGroupRange), stateHandle); } - /** - * - * @return the internal key-group range to offsets metadata - */ - public KeyGroupRangeOffsets getGroupRangeOffsets() { - return groupRangeOffsets; - } - - /** - * - * @return number of key-groups in the key-group range of this handle - */ - public int getNumberOfKeyGroups() { - return groupRangeOffsets.getKeyGroupRange().getNumberOfKeyGroups(); + @Override + public KeyGroupRange getKeyGroupRange() { + return groupRangeOffsets.getKeyGroupRange(); } @Override @@ -120,10 +107,6 @@ public class KeyGroupsStateHandle implements StreamStateHandle { return stateHandle.openInputStream(); } - public StreamStateHandle getDelegateStateHandle() { - return stateHandle; - } - @Override public boolean equals(Object o) { if (this == o) { http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java new file mode 100644 index 0000000..dc9c97d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +/** + * Base for the handles of the checkpointed states in keyed streams. When + * recovering from failures, the handle will be passed to all tasks whose key + * group ranges overlap with it. + */ +public interface KeyedStateHandle extends StateObject { + + /** + * Returns the range of the key groups contained in the state. + */ + KeyGroupRange getKeyGroupRange(); + + /** + * Returns a state over a range that is the intersection between this + * handle's key-group range and the provided key-group range. + * + * @param keyGroupRange The key group range to intersect with + */ + KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange); +} http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java index 886d214..d82af72 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java @@ -27,9 +27,11 @@ import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.util.Preconditions; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.NoSuchElementException; /** @@ -55,7 +57,7 @@ public class StateInitializationContextImpl implements StateInitializationContex boolean restored, OperatorStateStore operatorStateStore, KeyedStateStore keyedStateStore, - Collection keyGroupsStateHandles, + Collection keyedStateHandles, Collection operatorStateHandles, CloseableRegistry closableRegistry) { @@ -64,7 +66,7 @@ public class StateInitializationContextImpl implements StateInitializationContex this.operatorStateStore = operatorStateStore; this.keyedStateStore = keyedStateStore; this.operatorStateHandles = operatorStateHandles; - this.keyGroupsStateHandles = keyGroupsStateHandles; + this.keyGroupsStateHandles = transform(keyedStateHandles); this.keyedStateIterable = keyGroupsStateHandles == null ? null @@ -136,6 +138,26 @@ public class StateInitializationContextImpl implements StateInitializationContex IOUtils.closeQuietly(closableRegistry); } + private static Collection transform(Collection keyedStateHandles) { + if (keyedStateHandles == null) { + return null; + } + + List keyGroupsStateHandles = new ArrayList<>(); + + for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { + if (! (keyedStateHandle instanceof KeyGroupsStateHandle)) { + throw new IllegalStateException("Unexpected state handle type, " + + "expected: " + KeyGroupsStateHandle.class + + ", but found: " + keyedStateHandle.getClass() + "."); + } + + keyGroupsStateHandles.add((KeyGroupsStateHandle) keyedStateHandle); + } + + return keyGroupsStateHandles; + } + private static class KeyGroupStreamIterator extends AbstractStateStreamIterator { @@ -159,7 +181,7 @@ public class StateInitializationContextImpl implements StateInitializationContex while (stateHandleIterator.hasNext()) { currentStateHandle = stateHandleIterator.next(); - if (currentStateHandle.getNumberOfKeyGroups() > 0) { + if (currentStateHandle.getKeyGroupRange().getNumberOfKeyGroups() > 0) { currentOffsetsIterator = currentStateHandle.getGroupRangeOffsets().iterator(); return true; http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java index 96edccb..5db0138 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java @@ -109,15 +109,17 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext return operatorStateCheckpointOutputStream; } - public RunnableFuture getKeyedStateStreamFuture() throws IOException { - return closeAndUnregisterStreamToObtainStateHandle(keyedStateCheckpointOutputStream); + public RunnableFuture getKeyedStateStreamFuture() throws IOException { + KeyGroupsStateHandle keyGroupsStateHandle = closeAndUnregisterStreamToObtainStateHandle(keyedStateCheckpointOutputStream); + return new DoneFuture(keyGroupsStateHandle); } public RunnableFuture getOperatorStateStreamFuture() throws IOException { - return closeAndUnregisterStreamToObtainStateHandle(operatorStateCheckpointOutputStream); + OperatorStateHandle operatorStateHandle = closeAndUnregisterStreamToObtainStateHandle(operatorStateCheckpointOutputStream); + return new DoneFuture<>(operatorStateHandle); } - private RunnableFuture closeAndUnregisterStreamToObtainStateHandle( + private T closeAndUnregisterStreamToObtainStateHandle( NonClosingCheckpointOutputStream stream) throws IOException { if (null == stream) { return null; @@ -126,7 +128,7 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext closableRegistry.unregisterClosable(stream.getDelegate()); // for now we only support synchronous writing - return new DoneFuture<>(stream.closeAndGetHandle()); + return stream.closeAndGetHandle(); } private void closeAndUnregisterStream(NonClosingCheckpointOutputStream stream) throws IOException { http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java index 417a9dd..450413a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java @@ -40,10 +40,10 @@ public class TaskStateHandles implements Serializable { private final ChainedStateHandle legacyOperatorState; /** Collection of handles which represent the managed keyed state of the head operator */ - private final Collection managedKeyedState; + private final Collection managedKeyedState; /** Collection of handles which represent the raw/streamed keyed state of the head operator */ - private final Collection rawKeyedState; + private final Collection rawKeyedState; /** Outer list represents the operator chain, each collection holds handles for managed state of a single operator */ private final List> managedOperatorState; @@ -67,8 +67,8 @@ public class TaskStateHandles implements Serializable { ChainedStateHandle legacyOperatorState, List> managedOperatorState, List> rawOperatorState, - Collection managedKeyedState, - Collection rawKeyedState) { + Collection managedKeyedState, + Collection rawKeyedState) { this.legacyOperatorState = legacyOperatorState; this.managedKeyedState = managedKeyedState; @@ -82,11 +82,11 @@ public class TaskStateHandles implements Serializable { return legacyOperatorState; } - public Collection getManagedKeyedState() { + public Collection getManagedKeyedState() { return managedKeyedState; } - public Collection getRawKeyedState() { + public Collection getRawKeyedState() { return rawKeyedState; } @@ -110,8 +110,8 @@ public class TaskStateHandles implements Serializable { return out; } - private static List transform(KeyGroupsStateHandle in) { - return in == null ? Collections.emptyList() : Collections.singletonList(in); + private static List transform(T in) { + return in == null ? Collections.emptyList() : Collections.singletonList(in); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 46ec5c2..a332d7d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -39,6 +39,7 @@ import org.apache.flink.migration.runtime.state.KvStateSnapshot; import org.apache.flink.migration.runtime.state.memory.MigrationRestoreSnapshot; import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable; import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback; +import org.apache.flink.migration.state.MigrationKeyGroupStateHandle; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; @@ -50,6 +51,7 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.internal.InternalAggregatingState; @@ -223,7 +225,7 @@ public class HeapKeyedStateBackend extends AbstractKeyedStateBackend { @Override @SuppressWarnings("unchecked") - public RunnableFuture snapshot( + public RunnableFuture snapshot( final long checkpointId, final long timestamp, final CheckpointStreamFactory streamFactory, @@ -267,8 +269,8 @@ public class HeapKeyedStateBackend extends AbstractKeyedStateBackend { //--------------------------------------------------- this becomes the end of sync part // implementation of the async IO operation, based on FutureTask - final AbstractAsyncIOCallable ioCallable = - new AbstractAsyncIOCallable() { + final AbstractAsyncIOCallable ioCallable = + new AbstractAsyncIOCallable() { AtomicBoolean open = new AtomicBoolean(false); @@ -340,7 +342,7 @@ public class HeapKeyedStateBackend extends AbstractKeyedStateBackend { } }; - AsyncStoppableTaskWithCallback task = AsyncStoppableTaskWithCallback.from(ioCallable); + AsyncStoppableTaskWithCallback task = AsyncStoppableTaskWithCallback.from(ioCallable); if (!asynchronousSnapshots) { task.run(); @@ -354,7 +356,7 @@ public class HeapKeyedStateBackend extends AbstractKeyedStateBackend { @SuppressWarnings("deprecation") @Override - public void restore(Collection restoredState) throws Exception { + public void restore(Collection restoredState) throws Exception { LOG.info("Initializing heap keyed state backend from snapshot."); if (LOG.isDebugEnabled()) { @@ -369,19 +371,26 @@ public class HeapKeyedStateBackend extends AbstractKeyedStateBackend { } @SuppressWarnings({"unchecked"}) - private void restorePartitionedState(Collection state) throws Exception { + private void restorePartitionedState(Collection state) throws Exception { final Map kvStatesById = new HashMap<>(); int numRegisteredKvStates = 0; stateTables.clear(); - for (KeyGroupsStateHandle keyGroupsHandle : state) { + for (KeyedStateHandle keyedStateHandle : state) { - if (keyGroupsHandle == null) { + if (keyedStateHandle == null) { continue; } - FSDataInputStream fsDataInputStream = keyGroupsHandle.openInputStream(); + if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) { + throw new IllegalStateException("Unexpected state handle type, " + + "expected: " + KeyGroupsStateHandle.class + + ", but found: " + keyedStateHandle.getClass()); + } + + KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle; + FSDataInputStream fsDataInputStream = keyGroupsStateHandle.openInputStream(); cancelStreamRegistry.registerClosable(fsDataInputStream); try { @@ -412,9 +421,13 @@ public class HeapKeyedStateBackend extends AbstractKeyedStateBackend { } } - for (Tuple2 groupOffset : keyGroupsHandle.getGroupRangeOffsets()) { + for (Tuple2 groupOffset : keyGroupsStateHandle.getGroupRangeOffsets()) { int keyGroupIndex = groupOffset.f0; long offset = groupOffset.f1; + + // Check that restored key groups all belong to the backend. + Preconditions.checkState(keyGroupRange.contains(keyGroupIndex), "The key group must belong to the backend."); + fsDataInputStream.seek(offset); int writtenKeyGroupIndex = inView.readInt(); @@ -449,7 +462,7 @@ public class HeapKeyedStateBackend extends AbstractKeyedStateBackend { @SuppressWarnings({"unchecked", "rawtypes", "DeprecatedIsStillUsed"}) @Deprecated private void restoreOldSavepointKeyedState( - Collection stateHandles) throws IOException, ClassNotFoundException { + Collection stateHandles) throws IOException, ClassNotFoundException { if (stateHandles.isEmpty()) { return; @@ -457,8 +470,17 @@ public class HeapKeyedStateBackend extends AbstractKeyedStateBackend { Preconditions.checkState(1 == stateHandles.size(), "Only one element expected here."); + KeyedStateHandle keyedStateHandle = stateHandles.iterator().next(); + if (!(keyedStateHandle instanceof MigrationKeyGroupStateHandle)) { + throw new IllegalStateException("Unexpected state handle type, " + + "expected: " + MigrationKeyGroupStateHandle.class + + ", but found " + keyedStateHandle.getClass()); + } + + MigrationKeyGroupStateHandle keyGroupStateHandle = (MigrationKeyGroupStateHandle) keyedStateHandle; + HashMap> namedStates; - try (FSDataInputStream inputStream = stateHandles.iterator().next().openInputStream()) { + try (FSDataInputStream inputStream = keyGroupStateHandle.openInputStream()) { namedStates = InstantiationUtil.deserializeObject(inputStream, userCodeClassLoader); } http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index d8bba59..117c70d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateHandles; @@ -2346,13 +2347,13 @@ public class CheckpointCoordinatorTest { ChainedStateHandle operatorState = taskStateHandles.getLegacyOperatorState(); List> opStateBackend = taskStateHandles.getManagedOperatorState(); List> opStateRaw = taskStateHandles.getRawOperatorState(); - Collection keyGroupStateBackend = taskStateHandles.getManagedKeyedState(); - Collection keyGroupStateRaw = taskStateHandles.getRawKeyedState(); + Collection keyedStateBackend = taskStateHandles.getManagedKeyedState(); + Collection keyGroupStateRaw = taskStateHandles.getRawKeyedState(); actualOpStatesBackend.add(opStateBackend); actualOpStatesRaw.add(opStateRaw); assertNull(operatorState); - compareKeyedState(Collections.singletonList(originalKeyedStateBackend), keyGroupStateBackend); + compareKeyedState(Collections.singletonList(originalKeyedStateBackend), keyedStateBackend); compareKeyedState(Collections.singletonList(originalKeyedStateRaw), keyGroupStateRaw); } comparePartitionableState(expectedOpStatesBackend, actualOpStatesBackend); @@ -2690,32 +2691,38 @@ public class CheckpointCoordinatorTest { KeyGroupsStateHandle expectPartitionedKeyGroupState = generateKeyGroupState( jobVertexID, keyGroupPartitions.get(i), false); - Collection actualPartitionedKeyGroupState = taskStateHandles.getManagedKeyedState(); + Collection actualPartitionedKeyGroupState = taskStateHandles.getManagedKeyedState(); compareKeyedState(Collections.singletonList(expectPartitionedKeyGroupState), actualPartitionedKeyGroupState); } } public static void compareKeyedState( Collection expectPartitionedKeyGroupState, - Collection actualPartitionedKeyGroupState) throws Exception { + Collection actualPartitionedKeyGroupState) throws Exception { KeyGroupsStateHandle expectedHeadOpKeyGroupStateHandle = expectPartitionedKeyGroupState.iterator().next(); - int expectedTotalKeyGroups = expectedHeadOpKeyGroupStateHandle.getNumberOfKeyGroups(); + int expectedTotalKeyGroups = expectedHeadOpKeyGroupStateHandle.getKeyGroupRange().getNumberOfKeyGroups(); int actualTotalKeyGroups = 0; - for(KeyGroupsStateHandle keyGroupsStateHandle: actualPartitionedKeyGroupState) { - actualTotalKeyGroups += keyGroupsStateHandle.getNumberOfKeyGroups(); + for(KeyedStateHandle keyedStateHandle: actualPartitionedKeyGroupState) { + assertTrue(keyedStateHandle instanceof KeyGroupsStateHandle); + + actualTotalKeyGroups += keyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups(); } assertEquals(expectedTotalKeyGroups, actualTotalKeyGroups); try (FSDataInputStream inputStream = expectedHeadOpKeyGroupStateHandle.openInputStream()) { - for (int groupId : expectedHeadOpKeyGroupStateHandle.keyGroups()) { + for (int groupId : expectedHeadOpKeyGroupStateHandle.getKeyGroupRange()) { long offset = expectedHeadOpKeyGroupStateHandle.getOffsetForKeyGroup(groupId); inputStream.seek(offset); int expectedKeyGroupState = InstantiationUtil.deserializeObject(inputStream, Thread.currentThread().getContextClassLoader()); - for (KeyGroupsStateHandle oneActualKeyGroupStateHandle : actualPartitionedKeyGroupState) { - if (oneActualKeyGroupStateHandle.containsKeyGroup(groupId)) { + for (KeyedStateHandle oneActualKeyedStateHandle : actualPartitionedKeyGroupState) { + + assertTrue(oneActualKeyedStateHandle instanceof KeyGroupsStateHandle); + + KeyGroupsStateHandle oneActualKeyGroupStateHandle = (KeyGroupsStateHandle) oneActualKeyedStateHandle; + if (oneActualKeyGroupStateHandle.getKeyGroupRange().contains(groupId)) { long actualOffset = oneActualKeyGroupStateHandle.getOffsetForKeyGroup(groupId); try (FSDataInputStream actualInputStream = oneActualKeyGroupStateHandle.openInputStream()) { actualInputStream.seek(actualOffset); http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java index 18b07eb..7e0a7c1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateHandles; @@ -68,7 +69,7 @@ public class CheckpointStateRestoreTest { final ChainedStateHandle serializedState = CheckpointCoordinatorTest.generateChainedStateHandle(new SerializableObject()); KeyGroupRange keyGroupRange = KeyGroupRange.of(0,0); List testStates = Collections.singletonList(new SerializableObject()); - final KeyGroupsStateHandle serializedKeyGroupStates = CheckpointCoordinatorTest.generateKeyGroupState(keyGroupRange, testStates); + final KeyedStateHandle serializedKeyGroupStates = CheckpointCoordinatorTest.generateKeyGroupState(keyGroupRange, testStates); final JobID jid = new JobID(); final JobVertexID statefulId = new JobVertexID(); http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java index 6ab8620..1ecb2e3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.checkpoint.TaskState; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; @@ -58,6 +59,7 @@ import java.util.concurrent.ThreadLocalRandom; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; @SuppressWarnings("deprecation") public class MigrationV0ToV1Test { @@ -154,9 +156,15 @@ public class MigrationV0ToV1Test { } //check keyed state - KeyGroupsStateHandle keyGroupsStateHandle = subtaskState.getManagedKeyedState(); + KeyedStateHandle keyedStateHandle = subtaskState.getManagedKeyedState(); + if (t % 3 != 0) { - assertEquals(1, keyGroupsStateHandle.getNumberOfKeyGroups()); + + assertTrue(keyedStateHandle instanceof KeyGroupsStateHandle); + + KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle; + + assertEquals(1, keyGroupsStateHandle.getKeyGroupRange().getNumberOfKeyGroups()); assertEquals(p, keyGroupsStateHandle.getGroupRangeOffsets().getKeyGroupRange().getStartKeyGroup()); ByteStreamStateHandle stateHandle = @@ -172,7 +180,7 @@ public class MigrationV0ToV1Test { assertEquals(p, data[1]); } } else { - assertEquals(null, keyGroupsStateHandle); + assertEquals(null, keyedStateHandle); } } http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStreamTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStreamTest.java index 0c4ed74..cee0b02 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStreamTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStreamTest.java @@ -135,7 +135,7 @@ public class KeyedStateCheckpointOutputStreamTest { int count = 0; try (FSDataInputStream in = fullHandle.openInputStream()) { DataInputView div = new DataInputViewStreamWrapper(in); - for (int kg : fullHandle.keyGroups()) { + for (int kg : fullHandle.getKeyGroupRange()) { long off = fullHandle.getOffsetForKeyGroup(kg); if (off >= 0) { in.seek(off); @@ -152,7 +152,7 @@ public class KeyedStateCheckpointOutputStreamTest { int count = 0; try (FSDataInputStream in = fullHandle.openInputStream()) { DataInputView div = new DataInputViewStreamWrapper(in); - for (int kg : fullHandle.keyGroups()) { + for (int kg : fullHandle.getKeyGroupRange()) { long off = fullHandle.getOffsetForKeyGroup(kg); in.seek(off); Assert.assertEquals(kg, div.readInt()); http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 22bb715..ccc1eae 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -143,13 +143,13 @@ public abstract class StateBackendTestBase exten env.getTaskKvStateRegistry()); } - protected AbstractKeyedStateBackend restoreKeyedBackend(TypeSerializer keySerializer, KeyGroupsStateHandle state) throws Exception { + protected AbstractKeyedStateBackend restoreKeyedBackend(TypeSerializer keySerializer, KeyedStateHandle state) throws Exception { return restoreKeyedBackend(keySerializer, state, new DummyEnvironment("test", 1, 0)); } protected AbstractKeyedStateBackend restoreKeyedBackend( TypeSerializer keySerializer, - KeyGroupsStateHandle state, + KeyedStateHandle state, Environment env) throws Exception { return restoreKeyedBackend( keySerializer, @@ -163,7 +163,7 @@ public abstract class StateBackendTestBase exten TypeSerializer keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - List state, + List state, Environment env) throws Exception { AbstractKeyedStateBackend backend = getStateBackend().createKeyedStateBackend( @@ -436,7 +436,7 @@ public abstract class StateBackendTestBase exten backend.setCurrentKey(2); state.update(new TestPojo("u2", 2)); - KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot( + KeyedStateHandle snapshot = runSnapshot(backend.snapshot( 682375462378L, 2, streamFactory, @@ -497,7 +497,7 @@ public abstract class StateBackendTestBase exten backend.setCurrentKey(2); state.update(new TestPojo("u2", 2)); - KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot( + KeyedStateHandle snapshot = runSnapshot(backend.snapshot( 682375462378L, 2, streamFactory, @@ -524,7 +524,7 @@ public abstract class StateBackendTestBase exten // update to test state backends that eagerly serialize, such as RocksDB state.update(new TestPojo("u1", 11)); - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot( + KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot( 682375462378L, 2, streamFactory, @@ -585,7 +585,7 @@ public abstract class StateBackendTestBase exten backend.setCurrentKey(2); state.update(new TestPojo("u2", 2)); - KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot( + KeyedStateHandle snapshot = runSnapshot(backend.snapshot( 682375462378L, 2, streamFactory, @@ -611,7 +611,7 @@ public abstract class StateBackendTestBase exten // update to test state backends that eagerly serialize, such as RocksDB state.update(new TestPojo("u1", 11)); - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot( + KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot( 682375462378L, 2, streamFactory, @@ -670,7 +670,7 @@ public abstract class StateBackendTestBase exten assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)); // draw a snapshot - KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); // make some more modifications backend.setCurrentKey(1); @@ -681,7 +681,7 @@ public abstract class StateBackendTestBase exten state.update("u3"); // draw another snapshot - KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); // validate the original state backend.setCurrentKey(1); @@ -880,7 +880,7 @@ public abstract class StateBackendTestBase exten assertEquals(13, (int) state2.value()); // draw a snapshot - KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); backend = restoreKeyedBackend( @@ -952,7 +952,7 @@ public abstract class StateBackendTestBase exten assertEquals(42L, (long) state.value()); // draw a snapshot - KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1); @@ -997,7 +997,7 @@ public abstract class StateBackendTestBase exten assertEquals("1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); // make some more modifications backend.setCurrentKey(1); @@ -1008,7 +1008,7 @@ public abstract class StateBackendTestBase exten state.add("u3"); // draw another snapshot - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); // validate the original state backend.setCurrentKey(1); @@ -1091,7 +1091,7 @@ public abstract class StateBackendTestBase exten assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); // make some more modifications backend.setCurrentKey(1); @@ -1102,7 +1102,7 @@ public abstract class StateBackendTestBase exten state.add("u3"); // draw another snapshot - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); // validate the original state backend.setCurrentKey(1); @@ -1188,7 +1188,7 @@ public abstract class StateBackendTestBase exten assertEquals("Fold-Initial:,1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); // make some more modifications backend.setCurrentKey(1); @@ -1200,7 +1200,7 @@ public abstract class StateBackendTestBase exten state.add(103); // draw another snapshot - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); // validate the original state backend.setCurrentKey(1); @@ -1287,7 +1287,7 @@ public abstract class StateBackendTestBase exten getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); // make some more modifications backend.setCurrentKey(1); @@ -1299,7 +1299,7 @@ public abstract class StateBackendTestBase exten state.putAll(new HashMap() {{ put(1031, "1031"); put(1032, "1032"); }}); // draw another snapshot - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); // validate the original state backend.setCurrentKey(1); @@ -1606,13 +1606,13 @@ public abstract class StateBackendTestBase exten state.update("ShouldBeInSecondHalf"); - KeyGroupsStateHandle snapshot = FutureUtil.runIfNotDoneAndGet(backend.snapshot(0, 0, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot = FutureUtil.runIfNotDoneAndGet(backend.snapshot(0, 0, streamFactory, CheckpointOptions.forFullCheckpoint())); - List firstHalfKeyGroupStates = StateAssignmentOperation.getKeyGroupsStateHandles( + List firstHalfKeyGroupStates = StateAssignmentOperation.getKeyedStateHandles( Collections.singletonList(snapshot), KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(MAX_PARALLELISM, 2, 0)); - List secondHalfKeyGroupStates = StateAssignmentOperation.getKeyGroupsStateHandles( + List secondHalfKeyGroupStates = StateAssignmentOperation.getKeyedStateHandles( Collections.singletonList(snapshot), KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(MAX_PARALLELISM, 2, 1)); @@ -1672,7 +1672,7 @@ public abstract class StateBackendTestBase exten state.update("2"); // draw a snapshot - KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); // restore the first snapshot and validate it @@ -1723,7 +1723,7 @@ public abstract class StateBackendTestBase exten state.add("2"); // draw a snapshot - KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); // restore the first snapshot and validate it @@ -1776,7 +1776,7 @@ public abstract class StateBackendTestBase exten state.add("2"); // draw a snapshot - KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); // restore the first snapshot and validate it @@ -1827,7 +1827,7 @@ public abstract class StateBackendTestBase exten state.put("2", "Second"); // draw a snapshot - KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); // restore the first snapshot and validate it @@ -2093,7 +2093,7 @@ public abstract class StateBackendTestBase exten eq(env.getJobID()), eq(env.getJobVertexId()), eq(expectedKeyGroupRange), eq("banana"), any(KvStateID.class)); - KeyGroupsStateHandle snapshot = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); @@ -2124,7 +2124,7 @@ public abstract class StateBackendTestBase exten ListStateDescriptor kvId = new ListStateDescriptor<>("id", String.class); // draw a snapshot - KeyGroupsStateHandle snapshot = + KeyedStateHandle snapshot = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 1, streamFactory, CheckpointOptions.forFullCheckpoint())); assertNull(snapshot); backend.dispose(); @@ -2152,7 +2152,7 @@ public abstract class StateBackendTestBase exten streamFactory.setWaiterLatch(waiter); AbstractKeyedStateBackend backend = null; - KeyGroupsStateHandle stateHandle = null; + KeyedStateHandle stateHandle = null; try { backend = createKeyedBackend(IntSerializer.INSTANCE); @@ -2167,7 +2167,7 @@ public abstract class StateBackendTestBase exten valueState.update(i); } - RunnableFuture snapshot = + RunnableFuture snapshot = backend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint()); Thread runner = new Thread(snapshot); runner.start(); @@ -2249,7 +2249,7 @@ public abstract class StateBackendTestBase exten valueState.update(i); } - RunnableFuture snapshot = + RunnableFuture snapshot = backend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint()); Thread runner = new Thread(snapshot); @@ -2367,7 +2367,7 @@ public abstract class StateBackendTestBase exten } } - private KeyGroupsStateHandle runSnapshot(RunnableFuture snapshotRunnableFuture) throws Exception { + private KeyedStateHandle runSnapshot(RunnableFuture snapshotRunnableFuture) throws Exception { if(!snapshotRunnableFuture.isDone()) { Thread runner = new Thread(snapshotRunnableFuture); runner.start(); http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java index da0666a..3754d63 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; @@ -63,7 +64,7 @@ public class HeapKeyedStateBackendSnapshotMigrationTest extends HeapStateBackend try (BufferedInputStream bis = new BufferedInputStream((new FileInputStream(resource.getFile())))) { stateHandle = InstantiationUtil.deserializeObject(bis, Thread.currentThread().getContextClassLoader()); } - keyedBackend.restore(Collections.singleton(stateHandle)); + keyedBackend.restore(Collections.singleton(stateHandle)); final ListStateDescriptor stateDescr = new ListStateDescriptor<>("my-state", Long.class); stateDescr.initializeSerializerUnlessSet(new ExecutionConfig()); http://git-wip-us.apache.org/repos/asf/flink/blob/cd552741/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index e40a59b..a6a89b5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.api.operators; -import java.io.IOException; import org.apache.commons.io.IOUtils; import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; import org.apache.flink.annotation.PublicEvolving; @@ -47,9 +46,9 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; import org.apache.flink.runtime.state.KeyGroupsList; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateInitializationContext; @@ -70,6 +69,7 @@ import org.apache.flink.util.OutputTag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.io.Serializable; import java.util.Collection; import java.util.ConcurrentModificationException; @@ -198,7 +198,7 @@ public abstract class AbstractStreamOperator @Override public final void initializeState(OperatorStateHandles stateHandles) throws Exception { - Collection keyedStateHandlesRaw = null; + Collection keyedStateHandlesRaw = null; Collection operatorStateHandlesRaw = null; Collection operatorStateHandlesBackend = null; @@ -473,6 +473,7 @@ public abstract class AbstractStreamOperator // and then initialize the timer services for (KeyGroupStatePartitionStreamProvider streamProvider : context.getRawKeyedStateInputs()) { int keyGroupIdx = streamProvider.getKeyGroupId(); + checkArgument(localKeyGroupRange.contains(keyGroupIdx), "Key Group " + keyGroupIdx + " does not belong to the local range.");