flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/2] flink git commit: [FLINK-4218] [checkpoints] Do not rely on FileSystem to determing state sizes
Date Mon, 26 Sep 2016 12:11:41 GMT
Repository: flink
Updated Branches:
  refs/heads/master 28ff5a3c9 -> 95e9004e3


[FLINK-4218] [checkpoints] Do not rely on FileSystem to determing state sizes

This prevents failures on eventually consistent S3, where the operations for
keys (=entries in the parent directory/bucket) are not guaranteed to be immediately
 consistent (visible) after a blob was written.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/95e9004e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/95e9004e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/95e9004e

Branch: refs/heads/master
Commit: 95e9004e36fffae755eab7aa3d5d0d5e8bfb7113
Parents: 6f237cf
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Sep 23 15:16:27 2016 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Sep 26 14:11:05 2016 +0200

----------------------------------------------------------------------
 .../runtime/checkpoint/CompletedCheckpoint.java |  2 +-
 .../flink/runtime/checkpoint/TaskState.java     |  2 +-
 .../savepoint/SavepointV1Serializer.java        |  6 ++--
 .../flink/runtime/state/ChainedStateHandle.java |  2 +-
 .../runtime/state/KeyGroupsStateHandle.java     |  2 +-
 .../state/RetrievableStreamStateHandle.java     |  9 +++---
 .../apache/flink/runtime/state/StateObject.java |  6 ++--
 .../state/filesystem/FileStateHandle.java       | 32 ++++++++++++--------
 .../filesystem/FsCheckpointStreamFactory.java   |  9 +++++-
 .../FileSystemStateStorageHelper.java           | 15 +++------
 ...ZooKeeperCompletedCheckpointStoreITCase.java |  2 +-
 .../stats/SimpleCheckpointStatsTrackerTest.java | 18 +++--------
 .../state/AbstractCloseableHandleTest.java      |  6 ++--
 .../tasks/InterruptSensitiveRestoreTest.java    |  3 +-
 14 files changed, 58 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index e412006..7cb3916 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -108,7 +108,7 @@ public class CompletedCheckpoint implements StateObject {
 	}
 
 	@Override
-	public long getStateSize() throws Exception {
+	public long getStateSize() throws IOException {
 		long result = 0L;
 
 		for (TaskState taskState : taskStates.values()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
index 9025090..657dd60 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
@@ -152,7 +152,7 @@ public class TaskState implements StateObject {
 
 
 	@Override
-	public long getStateSize() throws Exception {
+	public long getStateSize() throws IOException {
 		long result = 0L;
 
 		for (int i = 0; i < parallelism; i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index 8e05b81..f07f44f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -197,6 +197,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1>
{
 		} else if (stateHandle instanceof FileStateHandle) {
 			dos.writeByte(FILE_STREAM_STATE_HANDLE);
 			FileStateHandle fileStateHandle = (FileStateHandle) stateHandle;
+			dos.writeLong(stateHandle.getStateSize());
 			dos.writeUTF(fileStateHandle.getFilePath().toString());
 
 		} else if (stateHandle instanceof ByteStreamStateHandle) {
@@ -218,12 +219,13 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1>
{
 		if (NULL_HANDLE == type) {
 			return null;
 		} else if (FILE_STREAM_STATE_HANDLE == type) {
+			long size = dis.readLong();
 			String pathString = dis.readUTF();
-			return new FileStateHandle(new Path(pathString));
+			return new FileStateHandle(new Path(pathString), size);
 		} else if (BYTE_STREAM_STATE_HANDLE == type) {
 			int numBytes = dis.readInt();
 			byte[] data = new byte[numBytes];
-			dis.read(data);
+			dis.readFully(data);
 			return new ByteStreamStateHandle(data);
 		} else {
 			throw new IOException("Unknown implementation of StreamStateHandle, code: " + type);

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
index 9b308a3..74057ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
@@ -85,7 +85,7 @@ public class ChainedStateHandle<T extends StateObject> implements
StateObject {
 	}
 
 	@Override
-	public long getStateSize() throws Exception {
+	public long getStateSize() throws IOException {
 		long sumStateSize = 0;
 
 		if (operatorStateHandles != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
index 0a36f92..7f87e86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
@@ -118,7 +118,7 @@ public class KeyGroupsStateHandle implements StateObject {
 	}
 
 	@Override
-	public long getStateSize() throws Exception {
+	public long getStateSize() throws IOException {
 		return stateHandle.getStateSize();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
index c6fd02c..9ecc4c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
@@ -20,8 +20,6 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.RetrievableStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
@@ -41,6 +39,7 @@ public class RetrievableStreamStateHandle<T extends Serializable>
implements
 		StreamStateHandle, RetrievableStateHandle<T>, Closeable {
 
 	private static final long serialVersionUID = 314567453677355L;
+
 	/** wrapped inner stream state handle from which we deserialize on retrieval */
 	private final StreamStateHandle wrappedStreamStateHandle;
 
@@ -48,9 +47,9 @@ public class RetrievableStreamStateHandle<T extends Serializable>
implements
 		this.wrappedStreamStateHandle = Preconditions.checkNotNull(streamStateHandle);
 	}
 
-	public RetrievableStreamStateHandle(Path filePath) {
+	public RetrievableStreamStateHandle(Path filePath, long stateSize) {
 		Preconditions.checkNotNull(filePath);
-		this.wrappedStreamStateHandle = new FileStateHandle(filePath);
+		this.wrappedStreamStateHandle = new FileStateHandle(filePath, stateSize);
 	}
 
 	@Override
@@ -71,7 +70,7 @@ public class RetrievableStreamStateHandle<T extends Serializable>
implements
 	}
 
 	@Override
-	public long getStateSize() throws Exception {
+	public long getStateSize() throws IOException {
 		return wrappedStreamStateHandle.getStateSize();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
index 47103c1..4c65318 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.state;
 
+import java.io.IOException;
+
 /**
  * Base of all types that represent checkpointed state. Specializations are for
  * example {@link StateHandle StateHandles} (directly resolve to state).
@@ -47,7 +49,7 @@ public interface StateObject extends java.io.Closeable, java.io.Serializable
{
 	 * <p>If the the size is not known, return {@code 0}.
 	 *
 	 * @return Size of the state in bytes.
-	 * @throws Exception If the operation fails during size retrieval.
+	 * @throws IOException If the operation fails during size retrieval.
 	 */
-	long getStateSize() throws Exception;
+	long getStateSize() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
index 5ae751b..f361263 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
@@ -26,7 +26,9 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 
 import java.io.IOException;
 
-import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 
 /**
  * {@link StreamStateHandle} for state that was written to a file stream. The written data
is
@@ -36,14 +38,13 @@ public class FileStateHandle extends AbstractCloseableHandle implements
StreamSt
 
 	private static final long serialVersionUID = 350284443258002355L;
 
-	/**
-	 * The path to the file in the filesystem, fully describing the file system
-	 */
+	/** The path to the file in the filesystem, fully describing the file system */
 	private final Path filePath;
 
-	/**
-	 * Cached file system handle
-	 */
+	/** The size of the state in the file */
+	private final long stateSize;
+
+	/** Cached file system handle */
 	private transient FileSystem fs;
 
 	/**
@@ -51,8 +52,10 @@ public class FileStateHandle extends AbstractCloseableHandle implements
StreamSt
 	 *
 	 * @param filePath The path to the file that stores the state.
 	 */
-	public FileStateHandle(Path filePath) {
-		this.filePath = requireNonNull(filePath);
+	public FileStateHandle(Path filePath, long stateSize) {
+		checkArgument(stateSize >= -1);
+		this.filePath = checkNotNull(filePath);
+		this.stateSize = stateSize;
 	}
 
 	/**
@@ -86,8 +89,7 @@ public class FileStateHandle extends AbstractCloseableHandle implements
StreamSt
 		// fail (and be ignored) when some files still exist
 		try {
 			getFileSystem().delete(filePath.getParent(), false);
-		} catch (IOException ignored) {
-		}
+		} catch (IOException ignored) {}
 	}
 
 	/**
@@ -98,7 +100,7 @@ public class FileStateHandle extends AbstractCloseableHandle implements
StreamSt
 	 */
 	@Override
 	public long getStateSize() throws IOException {
-		return getFileSystem().getFileStatus(filePath).getLen();
+		return stateSize;
 	}
 
 	/**
@@ -114,6 +116,7 @@ public class FileStateHandle extends AbstractCloseableHandle implements
StreamSt
 		return fs;
 	}
 
+	// ------------------------------------------------------------------------
 
 	@Override
 	public boolean equals(Object o) {
@@ -133,4 +136,9 @@ public class FileStateHandle extends AbstractCloseableHandle implements
StreamSt
 	public int hashCode() {
 		return filePath.hashCode();
 	}
+
+	@Override
+	public String toString() {
+		return String.format("File State: %s [%d bytes]", filePath, stateSize);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index c027558..e4f7eba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -301,9 +301,16 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory
{
 					}
 					else {
 						flush();
+
+						long size = -1;
+						// make a best effort attempt to figure out the size
+						try {
+							size = outStream.getPos();
+						} catch (Exception ignored) {}
+						
 						outStream.close();
 						closed = true;
-						return new FileStateHandle(statePath);
+						return new FileStateHandle(statePath, size);
 					}
 				}
 				else {

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
index a534b40..7658afb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
@@ -25,10 +25,10 @@ import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.state.RetrievableStreamStateHandle;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
-import java.io.ObjectOutputStream;
 import java.io.Serializable;
 
 /**
@@ -61,19 +61,14 @@ public class FileSystemStateStorageHelper<T extends Serializable>
implements Ret
 
 		for (int attempt = 0; attempt < 10; attempt++) {
 			Path filePath = getNewFilePath();
-			FSDataOutputStream outStream;
-			try {
-				outStream = fs.create(filePath, false);
+
+			try (FSDataOutputStream outStream = fs.create(filePath, false)) {
+				InstantiationUtil.serializeObject(outStream, state);
+				return new RetrievableStreamStateHandle<T>(filePath, outStream.getPos());
 			}
 			catch (Exception e) {
 				latestException = e;
-				continue;
-			}
-
-			try(ObjectOutputStream os = new ObjectOutputStream(outStream)) {
-				os.writeObject(state);
 			}
-			return new RetrievableStreamStateHandle<T>(filePath);
 		}
 
 		throw new Exception("Could not open output stream for state backend", latestException);

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index f273797..6a8d072 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -183,7 +183,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		}
 
 		@Override
-		public long getStateSize() throws Exception {
+		public long getStateSize() throws IOException {
 			return 0;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
index c513e26..504143b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
@@ -334,7 +334,7 @@ public class SimpleCheckpointStatsTrackerTest {
 					StreamStateHandle proxy = new StateHandleProxy(new Path(), proxySize);
 
 					SubtaskState subtaskState = new SubtaskState(
-						new ChainedStateHandle<>(Arrays.asList(proxy)),
+						new ChainedStateHandle<>(Collections.singletonList(proxy)),
 						duration);
 
 					taskState.putState(subtaskIndex, subtaskState);
@@ -371,21 +371,11 @@ public class SimpleCheckpointStatsTrackerTest {
 
 		private static final long serialVersionUID = 35356735683568L;
 
-		public StateHandleProxy(Path filePath, long proxySize) {
-			super(filePath);
-			this.proxySize = proxySize;
-		}
-
-		private long proxySize;
-
-		@Override
-		public void discardState() throws Exception {
-
+		public StateHandleProxy(Path filePath, long size) {
+			super(filePath, size);
 		}
 
 		@Override
-		public long getStateSize() {
-			return proxySize;
-		}
+		public void discardState() {}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java
index 40e1852..e613105 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java
@@ -87,12 +87,10 @@ public class AbstractCloseableHandleTest {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void discardState() throws Exception {
-
-		}
+		public void discardState() {}
 
 		@Override
-		public long getStateSize() throws Exception {
+		public long getStateSize() {
 			return 0;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/95e9004e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 47f1bd5..9f52e9c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -198,6 +198,7 @@ public class InterruptSensitiveRestoreTest {
 			// an interrupt on a waiting object leads to an infinite loop
 			try {
 				synchronized (this) {
+					//noinspection WaitNotInLoop
 					wait();
 				}
 			}
@@ -216,7 +217,7 @@ public class InterruptSensitiveRestoreTest {
 		public void discardState() throws Exception {}
 
 		@Override
-		public long getStateSize() throws Exception {
+		public long getStateSize() throws IOException {
 			return 0;
 		}
 	}


Mime
View raw message