Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7754E200BD8 for ; Wed, 7 Dec 2016 14:06:31 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 75E29160B2B; Wed, 7 Dec 2016 13:06:31 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4B368160AFD for ; Wed, 7 Dec 2016 14:06:30 +0100 (CET) Received: (qmail 78117 invoked by uid 500); 7 Dec 2016 13:06:29 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 78108 invoked by uid 99); 7 Dec 2016 13:06:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Dec 2016 13:06:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4DB92F179B; Wed, 7 Dec 2016 13:06:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: uce@apache.org To: commits@flink.apache.org Date: Wed, 07 Dec 2016 13:06:29 -0000 Message-Id: <1fb48f19262f4d778001def5f5b4b38d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] flink git commit: [FLINK-5041] Savepoint backwards compatibility 1.1 -> 1.2 archived-at: Wed, 07 Dec 2016 13:06:31 -0000 Repository: flink Updated Branches: refs/heads/master a6e80da30 -> af3bf837a http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java new file mode 100644 index 0000000..02365c7 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.savepoint; + +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0; +import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer; +import org.apache.flink.migration.runtime.state.KvStateSnapshot; +import org.apache.flink.migration.runtime.state.memory.MemValueState; +import org.apache.flink.migration.runtime.state.memory.SerializedStateHandle; +import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState; +import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskStateList; +import org.apache.flink.migration.util.MigrationInstantiationUtil; +import org.apache.flink.runtime.checkpoint.SubtaskState; +import org.apache.flink.runtime.checkpoint.TaskState; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.InstantiationUtil; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class MigrationV0ToV1Test { + + @Rule + public TemporaryFolder tmp = new TemporaryFolder(); + + /** + * Simple test of savepoint methods. + */ + @Test + public void testSavepointMigrationV0ToV1() throws Exception { + + String target = tmp.getRoot().getAbsolutePath(); + + assertEquals(0, tmp.getRoot().listFiles().length); + + long checkpointId = ThreadLocalRandom.current().nextLong(Integer.MAX_VALUE); + int numTaskStates = 4; + int numSubtaskStates = 16; + + Collection expected = + createTaskStatesOld(numTaskStates, numSubtaskStates); + + SavepointV0 savepoint = new SavepointV0(checkpointId, expected); + + assertEquals(SavepointV0.VERSION, savepoint.getVersion()); + assertEquals(checkpointId, savepoint.getCheckpointId()); + assertEquals(expected, savepoint.getOldTaskStates()); + + assertFalse(savepoint.getOldTaskStates().isEmpty()); + + Exception latestException = null; + Path path = null; + FSDataOutputStream fdos = null; + + FileSystem fs = null; + + try { + + // Try to create a FS output stream + for (int attempt = 0; attempt < 10; attempt++) { + path = new Path(target, FileUtils.getRandomFilename("savepoint-")); + + if (fs == null) { + fs = FileSystem.get(path.toUri()); + } + + try { + fdos = fs.create(path, false); + break; + } catch (Exception e) { + latestException = e; + } + } + + if (fdos == null) { + throw new IOException("Failed to create file output stream at " + path, latestException); + } + + try (DataOutputStream dos = new DataOutputStream(fdos)) { + dos.writeInt(SavepointStore.MAGIC_NUMBER); + dos.writeInt(savepoint.getVersion()); + SavepointV0Serializer.INSTANCE.serializeOld(savepoint, dos); + } + + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + + Savepoint sp = SavepointStore.loadSavepoint(path.toString(), cl); + int t = 0; + for (TaskState taskState : sp.getTaskStates()) { + for (int p = 0; p < taskState.getParallelism(); ++p) { + SubtaskState subtaskState = taskState.getState(p); + ChainedStateHandle legacyOperatorState = subtaskState.getLegacyOperatorState(); + for (int c = 0; c < legacyOperatorState.getLength(); ++c) { + StreamStateHandle stateHandle = legacyOperatorState.get(c); + try (InputStream is = stateHandle.openInputStream()) { + Tuple4 expTestState = new Tuple4<>(0, t, p, c); + Tuple4 actTestState = null; + //check function state + if (p % 4 != 0) { + assertEquals(1, is.read()); + actTestState = InstantiationUtil.deserializeObject(is, cl); + assertEquals(expTestState, actTestState); + } else { + assertEquals(0, is.read()); + } + + //check operator state + expTestState.f0 = 1; + if (p % 3 != 0) { + assertEquals(1, is.read()); + actTestState = InstantiationUtil.deserializeObject(is, cl); + assertEquals(expTestState, actTestState); + } else { + assertEquals(0, is.read()); + } + } + } + + //check keyed state + KeyGroupsStateHandle keyGroupsStateHandle = subtaskState.getManagedKeyedState(); + if (t % 3 != 0) { + assertEquals(1, keyGroupsStateHandle.getNumberOfKeyGroups()); + assertEquals(p, keyGroupsStateHandle.getGroupRangeOffsets().getKeyGroupRange().getStartKeyGroup()); + + ByteStreamStateHandle stateHandle = + (ByteStreamStateHandle) keyGroupsStateHandle.getDelegateStateHandle(); + HashMap> testKeyedState = + MigrationInstantiationUtil.deserializeObject(stateHandle.getData(), cl); + + assertEquals(2, testKeyedState.size()); + for (KvStateSnapshot snapshot : testKeyedState.values()) { + MemValueState.Snapshot castedSnapshot = (MemValueState.Snapshot) snapshot; + byte[] data = castedSnapshot.getData(); + assertEquals(t, data[0]); + assertEquals(p, data[1]); + } + } else { + assertEquals(null, keyGroupsStateHandle); + } + } + + ++t; + } + + savepoint.dispose(); + + } finally { + // Dispose + SavepointStore.removeSavepoint(path.toString()); + } + } + + private static Collection createTaskStatesOld( + int numTaskStates, int numSubtaskStates) throws Exception { + + List taskStates = new ArrayList<>(numTaskStates); + + for (int i = 0; i < numTaskStates; i++) { + org.apache.flink.migration.runtime.checkpoint.TaskState taskState = + new org.apache.flink.migration.runtime.checkpoint.TaskState(new JobVertexID(), numSubtaskStates); + for (int j = 0; j < numSubtaskStates; j++) { + + StreamTaskState[] streamTaskStates = new StreamTaskState[2]; + + for (int k = 0; k < streamTaskStates.length; k++) { + StreamTaskState state = new StreamTaskState(); + Tuple4 testState = new Tuple4<>(0, i, j, k); + if (j % 4 != 0) { + state.setFunctionState(new SerializedStateHandle(testState)); + } + testState = new Tuple4<>(1, i, j, k); + if (j % 3 != 0) { + state.setOperatorState(new SerializedStateHandle<>(testState)); + } + + if ((0 == k) && (i % 3 != 0)) { + HashMap> testKeyedState = new HashMap<>(2); + for (int l = 0; l < 2; ++l) { + String name = "keyed-" + l; + KvStateSnapshot testKeyedSnapshot = + new MemValueState.Snapshot<>( + IntSerializer.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + IntSerializer.INSTANCE, + new ValueStateDescriptor<>(name, Integer.class, 0), + new byte[]{(byte) i, (byte) j}); + testKeyedState.put(name, testKeyedSnapshot); + } + state.setKvStates(testKeyedState); + } + streamTaskStates[k] = state; + } + + StreamTaskStateList streamTaskStateList = new StreamTaskStateList(streamTaskStates); + org.apache.flink.migration.util.SerializedValue< + org.apache.flink.migration.runtime.state.StateHandle> handle = + new org.apache.flink.migration.util.SerializedValue< + org.apache.flink.migration.runtime.state.StateHandle>(streamTaskStateList); + + taskState.putState(j, new org.apache.flink.migration.runtime.checkpoint.SubtaskState(handle, 0, 0)); + } + + taskStates.add(taskState); + } + + return taskStates; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java index e1b83f4..67575d6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java @@ -77,8 +77,10 @@ public class SavepointLoaderTest { Map tasks = new HashMap<>(); tasks.put(vertexId, vertex); + ClassLoader ucl = Thread.currentThread().getContextClassLoader(); + // 1) Load and validate: everything correct - CompletedCheckpoint loaded = SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, false); + CompletedCheckpoint loaded = SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, ucl, false); assertEquals(jobId, loaded.getJobId()); assertEquals(checkpointId, loaded.getCheckpointID()); @@ -87,7 +89,7 @@ public class SavepointLoaderTest { when(vertex.getMaxParallelism()).thenReturn(222); try { - SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, false); + SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, ucl, false); fail("Did not throw expected Exception"); } catch (IllegalStateException expected) { assertTrue(expected.getMessage().contains("Max parallelism mismatch")); @@ -97,13 +99,13 @@ public class SavepointLoaderTest { assertNotNull(tasks.remove(vertexId)); try { - SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, false); + SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, ucl, false); fail("Did not throw expected Exception"); } catch (IllegalStateException expected) { assertTrue(expected.getMessage().contains("allowNonRestoredState")); } // 4) Load and validate: ignore missing vertex - SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, true); + SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, ucl, true); } } http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java index 8eed6ea..3398341 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java @@ -64,7 +64,7 @@ public class SavepointStoreTest { assertEquals(1, tmp.getRoot().listFiles().length); // Load - Savepoint loaded = SavepointStore.loadSavepoint(path); + Savepoint loaded = SavepointStore.loadSavepoint(path, Thread.currentThread().getContextClassLoader()); assertEquals(stored, loaded); loaded.dispose(); @@ -89,7 +89,7 @@ public class SavepointStoreTest { } try { - SavepointStore.loadSavepoint(filePath.toString()); + SavepointStore.loadSavepoint(filePath.toString(), Thread.currentThread().getContextClassLoader()); fail("Did not throw expected Exception"); } catch (RuntimeException e) { assertTrue(e.getMessage().contains("Flink 1.0") && e.getMessage().contains("Unexpected magic number")); @@ -128,10 +128,10 @@ public class SavepointStoreTest { assertEquals(2, tmp.getRoot().listFiles().length); // Load - Savepoint loaded = SavepointStore.loadSavepoint(pathNewSavepoint); + Savepoint loaded = SavepointStore.loadSavepoint(pathNewSavepoint, Thread.currentThread().getContextClassLoader()); assertEquals(newSavepoint, loaded); - loaded = SavepointStore.loadSavepoint(pathSavepoint); + loaded = SavepointStore.loadSavepoint(pathSavepoint, Thread.currentThread().getContextClassLoader()); assertEquals(savepoint, loaded); } @@ -176,7 +176,7 @@ public class SavepointStoreTest { } @Override - public TestSavepoint deserialize(DataInputStream dis) throws IOException { + public TestSavepoint deserialize(DataInputStream dis, ClassLoader userCL) throws IOException { int version = dis.readInt(); long checkpointId = dis.readLong(); return new TestSavepoint(version, checkpointId); http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java index 508a69d..58cf1aa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.checkpoint.savepoint; -import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; @@ -50,7 +49,9 @@ public class SavepointV1SerializerTest { // Deserialize ByteArrayInputStream bais = new ByteArrayInputStream(bytes); - Savepoint actual = serializer.deserialize(new DataInputViewStreamWrapper(bais)); + Savepoint actual = serializer.deserialize( + new DataInputViewStreamWrapper(bais), + Thread.currentThread().getContextClassLoader()); assertEquals(expected, actual); } http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/test/java/org/apache/flink/runtime/state/MultiStreamStateHandleTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MultiStreamStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MultiStreamStateHandleTest.java new file mode 100644 index 0000000..dd34f03 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MultiStreamStateHandleTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class MultiStreamStateHandleTest { + + private static final int TEST_DATA_LENGTH = 123; + private Random random; + private byte[] testData; + private List streamStateHandles; + + @Before + public void setup() { + random = new Random(0x42); + testData = new byte[TEST_DATA_LENGTH]; + for (int i = 0; i < testData.length; ++i) { + testData[i] = (byte) i; + } + + int idx = 0; + streamStateHandles = new ArrayList<>(); + while (idx < testData.length) { + int len = random.nextInt(5); + byte[] sub = Arrays.copyOfRange(testData, idx, idx + len); + streamStateHandles.add(new ByteStreamStateHandle(String.valueOf(idx), sub)); + idx += len; + } + } + + @Test + public void testMetaData() throws IOException { + MultiStreamStateHandle multiStreamStateHandle = new MultiStreamStateHandle(streamStateHandles); + assertEquals(TEST_DATA_LENGTH, multiStreamStateHandle.getStateSize()); + } + + @Test + public void testLinearRead() throws IOException { + MultiStreamStateHandle multiStreamStateHandle = new MultiStreamStateHandle(streamStateHandles); + try (FSDataInputStream in = multiStreamStateHandle.openInputStream()) { + + for (int i = 0; i < TEST_DATA_LENGTH; ++i) { + assertEquals(i, in.getPos()); + assertEquals(testData[i], in.read()); + } + + assertEquals(-1, in.read()); + assertEquals(TEST_DATA_LENGTH, in.getPos()); + assertEquals(-1, in.read()); + assertEquals(TEST_DATA_LENGTH, in.getPos()); + } + } + + @Test + public void testRandomRead() throws IOException { + + MultiStreamStateHandle multiStreamStateHandle = new MultiStreamStateHandle(streamStateHandles); + + try (FSDataInputStream in = multiStreamStateHandle.openInputStream()) { + + for (int i = 0; i < 1000; ++i) { + int pos = random.nextInt(TEST_DATA_LENGTH); + int readLen = random.nextInt(TEST_DATA_LENGTH); + in.seek(pos); + while (--readLen > 0 && pos < TEST_DATA_LENGTH) { + assertEquals(pos, in.getPos()); + assertEquals(testData[pos++], in.read()); + } + } + + in.seek(TEST_DATA_LENGTH); + assertEquals(TEST_DATA_LENGTH, in.getPos()); + assertEquals(-1, in.read()); + + try { + in.seek(TEST_DATA_LENGTH + 1); + fail(); + } catch (Exception ignored) { + + } + } + } + + @Test + public void testEmptyList() throws IOException { + + MultiStreamStateHandle multiStreamStateHandle = + new MultiStreamStateHandle(Collections.emptyList()); + + try (FSDataInputStream in = multiStreamStateHandle.openInputStream()) { + + assertEquals(0, in.getPos()); + in.seek(0); + assertEquals(0, in.getPos()); + assertEquals(-1, in.read()); + + try { + in.seek(1); + fail(); + } catch (Exception ignored) { + + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala index 68a50d3..a3d31f5 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala @@ -310,7 +310,11 @@ trait TestingJobManagerLike extends FlinkActor { case RequestSavepoint(savepointPath) => try { - val savepoint = SavepointStore.loadSavepoint(savepointPath) + //TODO user class loader ? + val savepoint = SavepointStore.loadSavepoint( + savepointPath, + Thread.currentThread().getContextClassLoader) + sender ! ResponseSavepoint(savepoint) } catch { http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index fac37c2..54f6c10 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -640,7 +640,7 @@ public abstract class StreamTask> for (int chainIdx = 0; chainIdx < allOperators.length; ++chainIdx) { StreamOperator operator = allOperators[chainIdx]; if (null != operator) { - if (restored) { + if (restored && restoreStateHandles != null) { operator.initializeState(new OperatorStateHandles(restoreStateHandles, chainIdx)); } else { operator.initializeState(null);