flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [1/3] flink git commit: [FLINK-5041] Savepoint backwards compatibility 1.1 -> 1.2
Date Wed, 07 Dec 2016 13:06:29 GMT
Repository: flink
Updated Branches:
  refs/heads/master a6e80da30 -> af3bf837a


http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
new file mode 100644
index 0000000..02365c7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0;
+import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer;
+import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+import org.apache.flink.migration.runtime.state.memory.MemValueState;
+import org.apache.flink.migration.runtime.state.memory.SerializedStateHandle;
+import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskStateList;
+import org.apache.flink.migration.util.MigrationInstantiationUtil;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.InstantiationUtil;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class MigrationV0ToV1Test {
+
+	@Rule
+	public TemporaryFolder tmp = new TemporaryFolder();
+
+	/**
+	 * Simple test of savepoint methods.
+	 */
+	@Test
+	public void testSavepointMigrationV0ToV1() throws Exception {
+
+		String target = tmp.getRoot().getAbsolutePath();
+
+		assertEquals(0, tmp.getRoot().listFiles().length);
+
+		long checkpointId = ThreadLocalRandom.current().nextLong(Integer.MAX_VALUE);
+		int numTaskStates = 4;
+		int numSubtaskStates = 16;
+
+		Collection<org.apache.flink.migration.runtime.checkpoint.TaskState> expected =
+				createTaskStatesOld(numTaskStates, numSubtaskStates);
+
+		SavepointV0 savepoint = new SavepointV0(checkpointId, expected);
+
+		assertEquals(SavepointV0.VERSION, savepoint.getVersion());
+		assertEquals(checkpointId, savepoint.getCheckpointId());
+		assertEquals(expected, savepoint.getOldTaskStates());
+
+		assertFalse(savepoint.getOldTaskStates().isEmpty());
+
+		Exception latestException = null;
+		Path path = null;
+		FSDataOutputStream fdos = null;
+
+		FileSystem fs = null;
+
+		try {
+
+			// Try to create a FS output stream
+			for (int attempt = 0; attempt < 10; attempt++) {
+				path = new Path(target, FileUtils.getRandomFilename("savepoint-"));
+
+				if (fs == null) {
+					fs = FileSystem.get(path.toUri());
+				}
+
+				try {
+					fdos = fs.create(path, false);
+					break;
+				} catch (Exception e) {
+					latestException = e;
+				}
+			}
+
+			if (fdos == null) {
+				throw new IOException("Failed to create file output stream at " + path, latestException);
+			}
+
+			try (DataOutputStream dos = new DataOutputStream(fdos)) {
+				dos.writeInt(SavepointStore.MAGIC_NUMBER);
+				dos.writeInt(savepoint.getVersion());
+				SavepointV0Serializer.INSTANCE.serializeOld(savepoint, dos);
+			}
+
+			ClassLoader cl = Thread.currentThread().getContextClassLoader();
+
+			Savepoint sp = SavepointStore.loadSavepoint(path.toString(), cl);
+			int t = 0;
+			for (TaskState taskState : sp.getTaskStates()) {
+				for (int p = 0; p < taskState.getParallelism(); ++p) {
+					SubtaskState subtaskState = taskState.getState(p);
+					ChainedStateHandle<StreamStateHandle> legacyOperatorState = subtaskState.getLegacyOperatorState();
+					for (int c = 0; c < legacyOperatorState.getLength(); ++c) {
+						StreamStateHandle stateHandle = legacyOperatorState.get(c);
+						try (InputStream is = stateHandle.openInputStream()) {
+							Tuple4<Integer, Integer, Integer, Integer> expTestState = new Tuple4<>(0,
t, p, c);
+							Tuple4<Integer, Integer, Integer, Integer> actTestState = null;
+							//check function state
+							if (p % 4 != 0) {
+								assertEquals(1, is.read());
+								actTestState = InstantiationUtil.deserializeObject(is, cl);
+								assertEquals(expTestState, actTestState);
+							} else {
+								assertEquals(0, is.read());
+							}
+
+							//check operator state
+							expTestState.f0 = 1;
+							if (p % 3 != 0) {
+								assertEquals(1, is.read());
+								actTestState = InstantiationUtil.deserializeObject(is, cl);
+								assertEquals(expTestState, actTestState);
+							} else {
+								assertEquals(0, is.read());
+							}
+						}
+					}
+
+					//check keyed state
+					KeyGroupsStateHandle keyGroupsStateHandle = subtaskState.getManagedKeyedState();
+					if (t % 3 != 0) {
+						assertEquals(1, keyGroupsStateHandle.getNumberOfKeyGroups());
+						assertEquals(p, keyGroupsStateHandle.getGroupRangeOffsets().getKeyGroupRange().getStartKeyGroup());
+
+						ByteStreamStateHandle stateHandle =
+								(ByteStreamStateHandle) keyGroupsStateHandle.getDelegateStateHandle();
+						HashMap<String, KvStateSnapshot<?, ?, ?, ?>> testKeyedState =
+								MigrationInstantiationUtil.deserializeObject(stateHandle.getData(), cl);
+
+						assertEquals(2, testKeyedState.size());
+						for (KvStateSnapshot<?, ?, ?, ?> snapshot : testKeyedState.values()) {
+							MemValueState.Snapshot<?, ?, ?> castedSnapshot = (MemValueState.Snapshot<?,
?, ?>) snapshot;
+							byte[] data = castedSnapshot.getData();
+							assertEquals(t, data[0]);
+							assertEquals(p, data[1]);
+						}
+					} else {
+						assertEquals(null, keyGroupsStateHandle);
+					}
+				}
+
+				++t;
+			}
+
+			savepoint.dispose();
+
+		} finally {
+			// Dispose
+			SavepointStore.removeSavepoint(path.toString());
+		}
+	}
+
+	private static Collection<org.apache.flink.migration.runtime.checkpoint.TaskState>
createTaskStatesOld(
+			int numTaskStates, int numSubtaskStates) throws Exception {
+
+		List<org.apache.flink.migration.runtime.checkpoint.TaskState> taskStates = new ArrayList<>(numTaskStates);
+
+		for (int i = 0; i < numTaskStates; i++) {
+			org.apache.flink.migration.runtime.checkpoint.TaskState taskState =
+					new org.apache.flink.migration.runtime.checkpoint.TaskState(new JobVertexID(), numSubtaskStates);
+			for (int j = 0; j < numSubtaskStates; j++) {
+
+				StreamTaskState[] streamTaskStates = new StreamTaskState[2];
+
+				for (int k = 0; k < streamTaskStates.length; k++) {
+					StreamTaskState state = new StreamTaskState();
+					Tuple4<Integer, Integer, Integer, Integer> testState = new Tuple4<>(0, i,
j, k);
+					if (j % 4 != 0) {
+						state.setFunctionState(new SerializedStateHandle<Serializable>(testState));
+					}
+					testState = new Tuple4<>(1, i, j, k);
+					if (j % 3 != 0) {
+						state.setOperatorState(new SerializedStateHandle<>(testState));
+					}
+
+					if ((0 == k) && (i % 3 != 0)) {
+						HashMap<String, KvStateSnapshot<?, ?, ?, ?>> testKeyedState = new HashMap<>(2);
+						for (int l = 0; l < 2; ++l) {
+							String name = "keyed-" + l;
+							KvStateSnapshot<?, ?, ?, ?> testKeyedSnapshot =
+									new MemValueState.Snapshot<>(
+											IntSerializer.INSTANCE,
+											VoidNamespaceSerializer.INSTANCE,
+											IntSerializer.INSTANCE,
+											new ValueStateDescriptor<>(name, Integer.class, 0),
+											new byte[]{(byte) i, (byte) j});
+							testKeyedState.put(name, testKeyedSnapshot);
+						}
+						state.setKvStates(testKeyedState);
+					}
+					streamTaskStates[k] = state;
+				}
+
+				StreamTaskStateList streamTaskStateList = new StreamTaskStateList(streamTaskStates);
+				org.apache.flink.migration.util.SerializedValue<
+						org.apache.flink.migration.runtime.state.StateHandle<?>> handle =
+						new org.apache.flink.migration.util.SerializedValue<
+								org.apache.flink.migration.runtime.state.StateHandle<?>>(streamTaskStateList);
+
+				taskState.putState(j, new org.apache.flink.migration.runtime.checkpoint.SubtaskState(handle,
0, 0));
+			}
+
+			taskStates.add(taskState);
+		}
+
+		return taskStates;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
index e1b83f4..67575d6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
@@ -77,8 +77,10 @@ public class SavepointLoaderTest {
 		Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
 		tasks.put(vertexId, vertex);
 
+		ClassLoader ucl = Thread.currentThread().getContextClassLoader();
+
 		// 1) Load and validate: everything correct
-		CompletedCheckpoint loaded = SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path,
false);
+		CompletedCheckpoint loaded = SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path,
ucl, false);
 
 		assertEquals(jobId, loaded.getJobId());
 		assertEquals(checkpointId, loaded.getCheckpointID());
@@ -87,7 +89,7 @@ public class SavepointLoaderTest {
 		when(vertex.getMaxParallelism()).thenReturn(222);
 
 		try {
-			SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, false);
+			SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, ucl, false);
 			fail("Did not throw expected Exception");
 		} catch (IllegalStateException expected) {
 			assertTrue(expected.getMessage().contains("Max parallelism mismatch"));
@@ -97,13 +99,13 @@ public class SavepointLoaderTest {
 		assertNotNull(tasks.remove(vertexId));
 
 		try {
-			SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, false);
+			SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, ucl, false);
 			fail("Did not throw expected Exception");
 		} catch (IllegalStateException expected) {
 			assertTrue(expected.getMessage().contains("allowNonRestoredState"));
 		}
 
 		// 4) Load and validate: ignore missing vertex
-		SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, true);
+		SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, ucl, true);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
index 8eed6ea..3398341 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
@@ -64,7 +64,7 @@ public class SavepointStoreTest {
 		assertEquals(1, tmp.getRoot().listFiles().length);
 
 		// Load
-		Savepoint loaded = SavepointStore.loadSavepoint(path);
+		Savepoint loaded = SavepointStore.loadSavepoint(path, Thread.currentThread().getContextClassLoader());
 		assertEquals(stored, loaded);
 
 		loaded.dispose();
@@ -89,7 +89,7 @@ public class SavepointStoreTest {
 		}
 
 		try {
-			SavepointStore.loadSavepoint(filePath.toString());
+			SavepointStore.loadSavepoint(filePath.toString(), Thread.currentThread().getContextClassLoader());
 			fail("Did not throw expected Exception");
 		} catch (RuntimeException e) {
 			assertTrue(e.getMessage().contains("Flink 1.0") && e.getMessage().contains("Unexpected
magic number"));
@@ -128,10 +128,10 @@ public class SavepointStoreTest {
 		assertEquals(2, tmp.getRoot().listFiles().length);
 
 		// Load
-		Savepoint loaded = SavepointStore.loadSavepoint(pathNewSavepoint);
+		Savepoint loaded = SavepointStore.loadSavepoint(pathNewSavepoint, Thread.currentThread().getContextClassLoader());
 		assertEquals(newSavepoint, loaded);
 
-		loaded = SavepointStore.loadSavepoint(pathSavepoint);
+		loaded = SavepointStore.loadSavepoint(pathSavepoint, Thread.currentThread().getContextClassLoader());
 		assertEquals(savepoint, loaded);
 	}
 
@@ -176,7 +176,7 @@ public class SavepointStoreTest {
 		}
 
 		@Override
-		public TestSavepoint deserialize(DataInputStream dis) throws IOException {
+		public TestSavepoint deserialize(DataInputStream dis, ClassLoader userCL) throws IOException
{
 			int version = dis.readInt();
 			long checkpointId = dis.readLong();
 			return new TestSavepoint(version, checkpointId);

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
index 508a69d..58cf1aa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
-import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -50,7 +49,9 @@ public class SavepointV1SerializerTest {
 
 			// Deserialize
 			ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
-			Savepoint actual = serializer.deserialize(new DataInputViewStreamWrapper(bais));
+			Savepoint actual = serializer.deserialize(
+					new DataInputViewStreamWrapper(bais),
+					Thread.currentThread().getContextClassLoader());
 
 			assertEquals(expected, actual);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/test/java/org/apache/flink/runtime/state/MultiStreamStateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MultiStreamStateHandleTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MultiStreamStateHandleTest.java
new file mode 100644
index 0000000..dd34f03
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MultiStreamStateHandleTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class MultiStreamStateHandleTest {
+
+	private static final int TEST_DATA_LENGTH = 123;
+	private Random random;
+	private byte[] testData;
+	private List<StreamStateHandle> streamStateHandles;
+
+	@Before
+	public void setup() {
+		random = new Random(0x42);
+		testData = new byte[TEST_DATA_LENGTH];
+		for (int i = 0; i < testData.length; ++i) {
+			testData[i] = (byte) i;
+		}
+
+		int idx = 0;
+		streamStateHandles = new ArrayList<>();
+		while (idx < testData.length) {
+			int len = random.nextInt(5);
+			byte[] sub = Arrays.copyOfRange(testData, idx, idx + len);
+			streamStateHandles.add(new ByteStreamStateHandle(String.valueOf(idx), sub));
+			idx += len;
+		}
+	}
+
+	@Test
+	public void testMetaData() throws IOException {
+		MultiStreamStateHandle multiStreamStateHandle = new MultiStreamStateHandle(streamStateHandles);
+		assertEquals(TEST_DATA_LENGTH, multiStreamStateHandle.getStateSize());
+	}
+
+	@Test
+	public void testLinearRead() throws IOException {
+		MultiStreamStateHandle multiStreamStateHandle = new MultiStreamStateHandle(streamStateHandles);
+		try (FSDataInputStream in = multiStreamStateHandle.openInputStream()) {
+
+			for (int i = 0; i < TEST_DATA_LENGTH; ++i) {
+				assertEquals(i, in.getPos());
+				assertEquals(testData[i], in.read());
+			}
+
+			assertEquals(-1, in.read());
+			assertEquals(TEST_DATA_LENGTH, in.getPos());
+			assertEquals(-1, in.read());
+			assertEquals(TEST_DATA_LENGTH, in.getPos());
+		}
+	}
+
+	@Test
+	public void testRandomRead() throws IOException {
+
+		MultiStreamStateHandle multiStreamStateHandle = new MultiStreamStateHandle(streamStateHandles);
+
+		try (FSDataInputStream in = multiStreamStateHandle.openInputStream()) {
+
+			for (int i = 0; i < 1000; ++i) {
+				int pos = random.nextInt(TEST_DATA_LENGTH);
+				int readLen = random.nextInt(TEST_DATA_LENGTH);
+				in.seek(pos);
+				while (--readLen > 0 && pos < TEST_DATA_LENGTH) {
+					assertEquals(pos, in.getPos());
+					assertEquals(testData[pos++], in.read());
+				}
+			}
+
+			in.seek(TEST_DATA_LENGTH);
+			assertEquals(TEST_DATA_LENGTH, in.getPos());
+			assertEquals(-1, in.read());
+
+			try {
+				in.seek(TEST_DATA_LENGTH + 1);
+				fail();
+			} catch (Exception ignored) {
+
+			}
+		}
+	}
+
+	@Test
+	public void testEmptyList() throws IOException {
+
+		MultiStreamStateHandle multiStreamStateHandle =
+				new MultiStreamStateHandle(Collections.<StreamStateHandle>emptyList());
+
+		try (FSDataInputStream in = multiStreamStateHandle.openInputStream()) {
+
+			assertEquals(0, in.getPos());
+			in.seek(0);
+			assertEquals(0, in.getPos());
+			assertEquals(-1, in.read());
+
+			try {
+				in.seek(1);
+				fail();
+			} catch (Exception ignored) {
+
+			}
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index 68a50d3..a3d31f5 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -310,7 +310,11 @@ trait TestingJobManagerLike extends FlinkActor {
 
     case RequestSavepoint(savepointPath) =>
       try {
-        val savepoint = SavepointStore.loadSavepoint(savepointPath)
+        //TODO user class loader ?
+        val savepoint = SavepointStore.loadSavepoint(
+          savepointPath,
+          Thread.currentThread().getContextClassLoader)
+        
         sender ! ResponseSavepoint(savepoint)
       }
       catch {

http://git-wip-us.apache.org/repos/asf/flink/blob/af3bf837/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index fac37c2..54f6c10 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -640,7 +640,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		for (int chainIdx = 0; chainIdx < allOperators.length; ++chainIdx) {
 			StreamOperator<?> operator = allOperators[chainIdx];
 			if (null != operator) {
-				if (restored) {
+				if (restored && restoreStateHandles != null) {
 					operator.initializeState(new OperatorStateHandles(restoreStateHandles, chainIdx));
 				} else {
 					operator.initializeState(null);


Mime
View raw message