flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-6662] [errMsg] Improve error message if recovery from RetrievableStateHandles fails
Date Tue, 23 May 2017 18:29:51 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.3 ce685dbda -> d552b3447


[FLINK-6662] [errMsg] Improve error message if recovery from RetrievableStateHandles fails

When recovering state from a ZooKeeperStateHandleStore it can happen that the deserialization
fails, because one tries to recover state from an old Flink version which is not compatible.
In this case we should output a better error message such that the user can easily spot the
problem.

This closes #3972.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d552b344
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d552b344
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d552b344

Branch: refs/heads/release-1.3
Commit: d552b34470748de803a999c2c4c1557c49b30045
Parents: ce685db
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue May 23 15:42:38 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue May 23 20:25:18 2017 +0200

----------------------------------------------------------------------
 .../store/ZooKeeperMesosWorkerStore.java        | 25 ++++++++++++++++----
 .../ZooKeeperCompletedCheckpointStore.java      | 11 +++++++--
 .../ZooKeeperSubmittedJobGraphStore.java        | 12 ++++++++--
 .../runtime/state/RetrievableStateHandle.java   |  3 ++-
 .../state/RetrievableStreamStateHandle.java     |  2 +-
 ...ZooKeeperCompletedCheckpointStoreITCase.java |  3 ++-
 .../ZooKeeperCompletedCheckpointStoreTest.java  |  3 ++-
 .../ZooKeeperStateHandleStoreTest.java          |  2 +-
 8 files changed, 47 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d552b344/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
index 663ce56..4544b8e 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
@@ -25,12 +25,14 @@ import org.apache.flink.runtime.zookeeper.ZooKeeperSharedCount;
 import org.apache.flink.runtime.zookeeper.ZooKeeperSharedValue;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.runtime.zookeeper.ZooKeeperVersionedValue;
+import org.apache.flink.util.FlinkException;
 import org.apache.mesos.Protos;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.ConcurrentModificationException;
@@ -171,19 +173,32 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 
 			List<Tuple2<RetrievableStateHandle<Worker>, String>> handles = workersInZooKeeper.getAllAndLock();
 
-			if(handles.size() != 0) {
+			if (handles.isEmpty()) {
+				return Collections.emptyList();
+			}
+			else {
 				List<MesosWorkerStore.Worker> workers = new ArrayList<>(handles.size());
+
 				for (Tuple2<RetrievableStateHandle<Worker>, String> handle : handles) {
-					Worker worker = handle.f0.retrieveState();
+					final Worker worker;
+
+					try {
+						worker = handle.f0.retrieveState();
+					} catch (ClassNotFoundException cnfe) {
+						throw new FlinkException("Could not retrieve Mesos worker from state handle under "
+
+							handle.f1 + ". This indicates that you are trying to recover from state written by
an " +
+							"older Flink version which is not compatible. Try cleaning the state handle store.",
cnfe);
+					} catch (IOException ioe) {
+						throw new FlinkException("Could not retrieve Mesos worker from state handle under "
+
+							handle.f1 + ". This indicates that the retrieved state handle is broken. Try cleaning
" +
+							"the state handle store.", ioe);
+					}
 
 					workers.add(worker);
 				}
 
 				return workers;
 			}
-			else {
-				return Collections.emptyList();
-			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d552b344/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 469c1b1..c4cb6bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.ConcurrentModificationException;
@@ -376,8 +377,14 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 
 		try {
 			return stateHandlePath.f0.retrieveState();
-		} catch (Exception e) {
-			throw new FlinkException("Could not retrieve checkpoint " + checkpointId + ". The state
handle seems to be broken.", e);
+		} catch (ClassNotFoundException cnfe) {
+			throw new FlinkException("Could not retrieve checkpoint " + checkpointId + " from state
handle under " +
+				stateHandlePath.f1 + ". This indicates that you are trying to recover from state written
by an " +
+				"older Flink version which is not compatible. Try cleaning the state handle store.",
cnfe);
+		} catch (IOException ioe) {
+			throw new FlinkException("Could not retrieve checkpoint " + checkpointId + " from state
handle under " +
+				stateHandlePath.f1 + ". This indicates that the retrieved state handle is broken. Try
cleaning the " +
+				"state handle store.", ioe);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d552b344/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index fa972ed..f31c970 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -27,10 +27,12 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.flink.util.FlinkException;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -182,8 +184,14 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore
{
 
 				try {
 					jobGraph = jobGraphRetrievableStateHandle.retrieveState();
-				} catch (Exception e) {
-					throw new Exception("Failed to retrieve the submitted job graph from state handle.",
e);
+				} catch (ClassNotFoundException cnfe) {
+					throw new FlinkException("Could not retrieve submitted JobGraph from state handle under
" + path +
+						". This indicates that you are trying to recover from state written by an " +
+						"older Flink version which is not compatible. Try cleaning the state handle store.",
cnfe);
+				} catch (IOException ioe) {
+					throw new FlinkException("Could not retrieve submitted JobGraph from state handle under
" + path +
+						". This indicates that the retrieved state handle is broken. Try cleaning the state
handle " +
+						"store.", ioe);
 				}
 
 				addedJobGraphs.add(jobGraph.getJobId());

http://git-wip-us.apache.org/repos/asf/flink/blob/d552b344/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStateHandle.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStateHandle.java
index d547624..30ac8f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStateHandle.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
+import java.io.IOException;
 import java.io.Serializable;
 
 /**
@@ -28,5 +29,5 @@ public interface RetrievableStateHandle<T extends Serializable> extends
StateObj
 	/**
 	 * Retrieves the object that was previously written to state.
 	 */
-	T retrieveState() throws Exception;
+	T retrieveState() throws IOException, ClassNotFoundException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d552b344/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
index 653e227..6ed60fa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
@@ -53,7 +53,7 @@ public class RetrievableStreamStateHandle<T extends Serializable>
implements
 	}
 
 	@Override
-	public T retrieveState() throws Exception {
+	public T retrieveState() throws IOException, ClassNotFoundException {
 		try (FSDataInputStream in = openInputStream()) {
 			return InstantiationUtil.deserializeObject(in, Thread.currentThread().getContextClassLoader());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/d552b344/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 81ee4f9..77423c2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -297,8 +297,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 			stateMap.put(key, state);
 		}
 
+		@SuppressWarnings("unchecked")
 		@Override
-		public T retrieveState() throws Exception {
+		public T retrieveState() {
 			return (T) stateMap.get(key);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d552b344/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 23cc8c8..b5854dd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -39,6 +39,7 @@ import org.mockito.stubbing.Answer;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -93,7 +94,7 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 		expectedCheckpointIds.add(2L);
 
 		final RetrievableStateHandle<CompletedCheckpoint> failingRetrievableStateHandle =
mock(RetrievableStateHandle.class);
-		when(failingRetrievableStateHandle.retrieveState()).thenThrow(new Exception("Test exception"));
+		when(failingRetrievableStateHandle.retrieveState()).thenThrow(new IOException("Test exception"));
 
 		final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle1 = mock(RetrievableStateHandle.class);
 		when(retrievableStateHandle1.retrieveState()).thenReturn(completedCheckpoint1);

http://git-wip-us.apache.org/repos/asf/flink/blob/d552b344/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
index 0c215cd..fd39b25 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
@@ -784,7 +784,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
 		}
 
 		@Override
-		public Long retrieveState() throws Exception {
+		public Long retrieveState() {
 			return state;
 		}
 


Mime
View raw message