flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [14/22] flink git commit: [FLINK-8531] [checkpoints] (part 5) Introduce CheckpointStorageLocationReference instead of String to communicate the location
Date Thu, 01 Feb 2018 15:46:46 GMT
[FLINK-8531] [checkpoints] (part 5) Introduce CheckpointStorageLocationReference instead of String to communicate the location


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb19e7f5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb19e7f5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb19e7f5

Branch: refs/heads/master
Commit: bb19e7f5278d43cd4fd265e3d2afa2fcc793ccf5
Parents: 5cc5093
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Jan 19 13:37:08 2018 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Feb 1 13:54:55 2018 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  11 +-
 .../runtime/checkpoint/CheckpointOptions.java   |  72 ++++++----
 .../api/serialization/EventSerializer.java      | 143 +++++++++++--------
 .../state/CheckpointStorageLocation.java        |  13 +-
 .../CheckpointStorageLocationReference.java     | 129 +++++++++++++++++
 .../filesystem/AbstractFsCheckpointStorage.java |  72 +++++++++-
 .../state/filesystem/FsCheckpointStorage.java   |   4 +-
 .../filesystem/FsCheckpointStorageLocation.java |  15 +-
 ...istentMetadataCheckpointStorageLocation.java |   5 +-
 ...istentMetadataCheckpointStorageLocation.java |  14 +-
 .../checkpoint/CheckpointOptionsTest.java       |  41 ++++--
 .../checkpoint/PendingCheckpointTest.java       |   5 +-
 .../api/serialization/EventSerializerTest.java  |  10 +-
 .../FsStorageLocationReferenceTest.java         |  90 ++++++++++++
 .../api/operators/AbstractStreamOperator.java   |   9 +-
 15 files changed, 485 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bb19e7f5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index bf571e6..18787f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -98,7 +98,7 @@ public class CheckpointCoordinator {
 
 	/** The executor used for asynchronous calls, like potentially blocking I/O */
 	private final Executor executor;
-	
+
 	/** Tasks who need to be sent a message when a checkpoint is started */
 	private final ExecutionVertex[] tasksToTrigger;
 
@@ -602,12 +602,9 @@ public class CheckpointCoordinator {
 				}
 				// end of lock scope
 
-				CheckpointOptions checkpointOptions;
-				if (!props.isSavepoint()) {
-					checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation();
-				} else {
-					checkpointOptions = CheckpointOptions.forSavepoint(checkpointStorageLocation.getLocationAsPointer());
-				}
+				final CheckpointOptions checkpointOptions = new CheckpointOptions(
+						props.getCheckpointType(),
+						checkpointStorageLocation.getLocationReference());
 
 				// send the messages to the tasks that trigger their checkpoint
 				for (Execution execution: executions) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bb19e7f5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
index bee2922..09d3516 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
@@ -19,12 +19,11 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 
 import java.io.Serializable;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Options for performing the checkpoint.
@@ -38,59 +37,70 @@ public class CheckpointOptions implements Serializable {
 	private static final long serialVersionUID = 5010126558083292915L;
 
 	/** Type of the checkpoint. */
-	@Nonnull
 	private final CheckpointType checkpointType;
 
 	/** Target location for the checkpoint. */
-	@Nullable
-	private final String targetLocation;
+	private final CheckpointStorageLocationReference targetLocation;
+
+	public CheckpointOptions(
+			CheckpointType checkpointType,
+			CheckpointStorageLocationReference targetLocation) {
 
-	private CheckpointOptions(
-			@Nonnull CheckpointType checkpointType,
-			@Nullable  String targetLocation) {
 		this.checkpointType = checkNotNull(checkpointType);
-		this.targetLocation = targetLocation;
+		this.targetLocation = checkNotNull(targetLocation);
 	}
 
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Returns the type of checkpoint to perform.
-	 *
-	 * @return Type of checkpoint to perform.
 	 */
-	@Nonnull
 	public CheckpointType getCheckpointType() {
 		return checkpointType;
 	}
 
 	/**
-	 * Returns a custom target location or <code>null</code> if none
-	 * was specified.
-	 *
-	 * @return A custom target location or <code>null</code>.
+	 * Returns the target location for the checkpoint.
 	 */
-	@Nullable
-	public String getTargetLocation() {
+	public CheckpointStorageLocationReference getTargetLocation() {
 		return targetLocation;
 	}
 
-	@Override
-	public String toString() {
-		return "CheckpointOptions(" + checkpointType + ")";
-	}
-
 	// ------------------------------------------------------------------------
 
-	private static final CheckpointOptions CHECKPOINT = new CheckpointOptions(CheckpointType.CHECKPOINT, null);
+	@Override
+	public int hashCode() {
+		return 31 * targetLocation.hashCode() + checkpointType.hashCode();
+	}
 
-	public static CheckpointOptions forCheckpointWithDefaultLocation() {
-		return CHECKPOINT;
+	@Override
+	public boolean equals(Object obj) {
+		if (this == obj) {
+			return true;
+		}
+		else if (obj != null && obj.getClass() == CheckpointOptions.class) {
+			final CheckpointOptions that = (CheckpointOptions) obj;
+			return this.checkpointType == that.checkpointType &&
+					this.targetLocation.equals(that.targetLocation);
+		}
+		else {
+			return false;
+		}
 	}
 
-	public static CheckpointOptions forSavepoint(String targetDirectory) {
-		checkNotNull(targetDirectory, "targetDirectory");
-		return new CheckpointOptions(CheckpointType.SAVEPOINT, targetDirectory);
+	@Override
+	public String toString() {
+		return "CheckpointOptions: " + checkpointType + " @ " + targetLocation;
 	}
 
 	// ------------------------------------------------------------------------
+	//  Factory methods
+	// ------------------------------------------------------------------------
+
+	private static final CheckpointOptions CHECKPOINT_AT_DEFAULT_LOCATION =
+			new CheckpointOptions(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault());
 
+	public static CheckpointOptions forCheckpointWithDefaultLocation() {
+		return CHECKPOINT_AT_DEFAULT_LOCATION;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bb19e7f5/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index 84a1c6a..af0bfe1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -18,7 +18,8 @@
 
 package org.apache.flink.runtime.io.network.api.serialization;
 
-import java.nio.charset.Charset;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -31,21 +32,21 @@ import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
-import org.apache.flink.core.memory.DataInputDeserializer;
-import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.util.InstantiationUtil;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import org.apache.flink.util.Preconditions;
 
 /**
  * Utility class to serialize and deserialize task events.
  */
 public class EventSerializer {
 
-	private static final Charset STRING_CODING_CHARSET = Charset.forName("UTF-8");
+	// ------------------------------------------------------------------------
+	//  Constants
+	// ------------------------------------------------------------------------
 
 	private static final int END_OF_PARTITION_EVENT = 0;
 
@@ -57,6 +58,12 @@ public class EventSerializer {
 
 	private static final int CANCEL_CHECKPOINT_MARKER_EVENT = 4;
 
+	private static final int CHECKPOINT_TYPE_CHECKPOINT = 0;
+
+	private static final int CHECKPOINT_TYPE_SAVEPOINT = 1;
+
+	// ------------------------------------------------------------------------
+	//  Serialization Logic
 	// ------------------------------------------------------------------------
 
 	public static ByteBuffer toSerializedEvent(AbstractEvent event) throws IOException {
@@ -65,37 +72,7 @@ public class EventSerializer {
 			return ByteBuffer.wrap(new byte[] { 0, 0, 0, END_OF_PARTITION_EVENT });
 		}
 		else if (eventClass == CheckpointBarrier.class) {
-			CheckpointBarrier barrier = (CheckpointBarrier) event;
-
-			CheckpointOptions checkpointOptions = barrier.getCheckpointOptions();
-			CheckpointType checkpointType = checkpointOptions.getCheckpointType();
-
-			ByteBuffer buf;
-			if (checkpointType == CheckpointType.CHECKPOINT) {
-				buf = ByteBuffer.allocate(24);
-				buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
-				buf.putLong(4, barrier.getId());
-				buf.putLong(12, barrier.getTimestamp());
-				buf.putInt(20, checkpointType.ordinal());
-			} else if (checkpointType == CheckpointType.SAVEPOINT) {
-				String targetLocation = checkpointOptions.getTargetLocation();
-				assert(targetLocation != null);
-				byte[] locationBytes = targetLocation.getBytes(STRING_CODING_CHARSET);
-
-				buf = ByteBuffer.allocate(24 + 4 + locationBytes.length);
-				buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
-				buf.putLong(4, barrier.getId());
-				buf.putLong(12, barrier.getTimestamp());
-				buf.putInt(20, checkpointType.ordinal());
-				buf.putInt(24, locationBytes.length);
-				for (int i = 0; i < locationBytes.length; i++) {
-					buf.put(28 + i, locationBytes[i]);
-				}
-			} else {
-				throw new IOException("Unknown checkpoint type: " + checkpointType);
-			}
-
-			return buf;
+			return serializeCheckpointBarrier((CheckpointBarrier) event);
 		}
 		else if (eventClass == EndOfSuperstepEvent.class) {
 			return ByteBuffer.wrap(new byte[] { 0, 0, 0, END_OF_SUPERSTEP_EVENT });
@@ -131,7 +108,6 @@ public class EventSerializer {
 	 * @param eventClass the expected class of the event type
 	 * @param classLoader the class loader to use for custom event classes
 	 * @return whether the event class of the <tt>buffer</tt> matches the given <tt>eventClass</tt>
-	 * @throws IOException
 	 */
 	private static boolean isEvent(ByteBuffer buffer, Class<?> eventClass, ClassLoader classLoader) throws IOException {
 		if (buffer.remaining() < 4) {
@@ -195,35 +171,13 @@ public class EventSerializer {
 		buffer.order(ByteOrder.BIG_ENDIAN);
 
 		try {
-			int type = buffer.getInt();
+			final int type = buffer.getInt();
 
 			if (type == END_OF_PARTITION_EVENT) {
 				return EndOfPartitionEvent.INSTANCE;
 			}
 			else if (type == CHECKPOINT_BARRIER_EVENT) {
-				long id = buffer.getLong();
-				long timestamp = buffer.getLong();
-
-				CheckpointOptions checkpointOptions;
-
-				int checkpointTypeOrdinal = buffer.getInt();
-				Preconditions.checkElementIndex(type, CheckpointType.values().length, "Illegal CheckpointType ordinal");
-				CheckpointType checkpointType = CheckpointType.values()[checkpointTypeOrdinal];
-
-				if (checkpointType == CheckpointType.CHECKPOINT) {
-					checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation();
-				} else if (checkpointType == CheckpointType.SAVEPOINT) {
-					int len = buffer.getInt();
-					byte[] bytes = new byte[len];
-					buffer.get(bytes);
-					String targetLocation = new String(bytes, STRING_CODING_CHARSET);
-
-					checkpointOptions = CheckpointOptions.forSavepoint(targetLocation);
-				} else {
-					throw new IOException("Unknown checkpoint type: " + checkpointType);
-				}
-
-				return new CheckpointBarrier(id, timestamp, checkpointOptions);
+				return deserializeCheckpointBarrier(buffer);
 			}
 			else if (type == END_OF_SUPERSTEP_EVENT) {
 				return EndOfSuperstepEvent.INSTANCE;
@@ -257,7 +211,7 @@ public class EventSerializer {
 				catch (Exception e) {
 					throw new IOException("Error while deserializing or instantiating event.", e);
 				}
-			} 
+			}
 			else {
 				throw new IOException("Corrupt byte stream for event");
 			}
@@ -267,6 +221,70 @@ public class EventSerializer {
 		}
 	}
 
+	private static ByteBuffer serializeCheckpointBarrier(CheckpointBarrier barrier) throws IOException {
+		final CheckpointOptions checkpointOptions = barrier.getCheckpointOptions();
+		final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
+
+		final byte[] locationBytes = checkpointOptions.getTargetLocation().isDefaultReference() ?
+				null : checkpointOptions.getTargetLocation().getReferenceBytes();
+
+		final ByteBuffer buf = ByteBuffer.allocate(28 + (locationBytes == null ? 0 : locationBytes.length));
+
+		// we do not use checkpointType.ordinal() here to make the serialization robust
+		// against changes in the enum (such as changes in the order of the values)
+		final int typeInt;
+		if (checkpointType == CheckpointType.CHECKPOINT) {
+			typeInt = CHECKPOINT_TYPE_CHECKPOINT;
+		} else if (checkpointType == CheckpointType.SAVEPOINT) {
+			typeInt = CHECKPOINT_TYPE_SAVEPOINT;
+		} else {
+			throw new IOException("Unknown checkpoint type: " + checkpointType);
+		}
+
+		buf.putInt(CHECKPOINT_BARRIER_EVENT);
+		buf.putLong(barrier.getId());
+		buf.putLong(barrier.getTimestamp());
+		buf.putInt(typeInt);
+
+		if (locationBytes == null) {
+			buf.putInt(-1);
+		} else {
+			buf.putInt(locationBytes.length);
+			buf.put(locationBytes);
+		}
+
+		buf.flip();
+		return buf;
+	}
+
+	private static CheckpointBarrier deserializeCheckpointBarrier(ByteBuffer buffer) throws IOException {
+		final long id = buffer.getLong();
+		final long timestamp = buffer.getLong();
+
+		final int checkpointTypeCode = buffer.getInt();
+		final int locationRefLen = buffer.getInt();
+
+		final CheckpointType checkpointType;
+		if (checkpointTypeCode == CHECKPOINT_TYPE_CHECKPOINT) {
+			checkpointType = CheckpointType.CHECKPOINT;
+		} else if (checkpointTypeCode == CHECKPOINT_TYPE_SAVEPOINT) {
+			checkpointType = CheckpointType.SAVEPOINT;
+		} else {
+			throw new IOException("Unknown checkpoint type code: " + checkpointTypeCode);
+		}
+
+		final CheckpointStorageLocationReference locationRef;
+		if (locationRefLen == -1) {
+			locationRef = CheckpointStorageLocationReference.getDefault();
+		} else {
+			byte[] bytes = new byte[locationRefLen];
+			buffer.get(bytes);
+			locationRef = new CheckpointStorageLocationReference(bytes);
+		}
+
+		return new CheckpointBarrier(id, timestamp, new CheckpointOptions(checkpointType, locationRef));
+	}
+
 	// ------------------------------------------------------------------------
 	// Buffer helpers
 	// ------------------------------------------------------------------------
@@ -293,7 +311,6 @@ public class EventSerializer {
 	 * @param eventClass the expected class of the event type
 	 * @param classLoader the class loader to use for custom event classes
 	 * @return whether the event class of the <tt>buffer</tt> matches the given <tt>eventClass</tt>
-	 * @throws IOException
 	 */
 	public static boolean isEvent(final Buffer buffer,
 		final Class<?> eventClass,

http://git-wip-us.apache.org/repos/asf/flink/blob/bb19e7f5/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java
index fbc4805..aeb4b14 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java
@@ -53,13 +53,12 @@ public interface CheckpointStorageLocation {
 	void disposeOnFailure() throws IOException;
 
 	/**
-	 * Gets the location encoded as a string pointer.
+	 * Gets a reference to the storage location. This reference is sent to the
+	 * target storage location via checkpoint RPC messages and checkpoint barriers,
+	 * in a format avoiding backend-specific classes.
 	 *
-	 * <p>This pointer is used to send the target storage location via checkpoint RPC messages
-	 * and checkpoint barriers, in a format avoiding backend-specific classes.
-	 *
-	 * <p>That string encodes the location typically in a backend-specific way.
-	 * For example, file-based backends can encode paths here.
+	 * <p>If there is no custom location information that needs to be communicated,
+	 * this method can simply return {@link CheckpointStorageLocationReference#getDefault()}.
 	 */
-	String getLocationAsPointer();
+	CheckpointStorageLocationReference getLocationReference();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bb19e7f5/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocationReference.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocationReference.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocationReference.java
new file mode 100644
index 0000000..58d08c5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocationReference.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.ObjectStreamException;
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A reference to a storage location. This is a wrapper around an array of bytes that
+ * are subject to interpretation by the state backend's storage locations (similar as
+ * a serializer needs to interpret byte streams). There is special handling for a
+ * 'default location', which can be used as an optimization by state backends, when no
+ * extra information is needed to determine where the checkpoints should be stored
+ * (all information can be derived from the configuration and the checkpoint id).
+ *
+ * <h3>Why is this simply a byte array?</h3>
+ *
+ * <p>The reference is represented via raw bytes, which are subject to interpretation
+ * by the state backends. We did not add any more typing and serialization abstraction
+ * in between, because these types need to serialize/deserialize fast in between
+ * network streams (byte buffers) and barriers. We may ultimately add some more typing
+ * if we simply keep the byte buffers for the checkpoint barriers and forward them,
+ * thus saving decoding and re-encoding these references repeatedly.
+ */
+public class CheckpointStorageLocationReference implements java.io.Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	/** The encoded location reference. null indicates the default location. */
+	private final byte[] encodedReference;
+
+	/**
+	 * Creates a new location reference.
+	 *
+	 * @param encodedReference The location reference, represented as bytes (non null)
+	 */
+	public CheckpointStorageLocationReference(byte[] encodedReference) {
+		checkNotNull(encodedReference);
+		checkArgument(encodedReference.length > 0);
+
+		this.encodedReference = encodedReference;
+	}
+
+	/**
+	 * Private constructor for singleton only.
+	 */
+	private CheckpointStorageLocationReference() {
+		this.encodedReference = null;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the reference bytes.
+	 *
+	 * <p><b>Important:</b> For efficiency, this method does not make a defensive copy,
+	 * so the caller must not modify the bytes in the array.
+	 */
+	public byte[] getReferenceBytes() {
+		// return a non null object always
+		return encodedReference != null ? encodedReference : new byte[0];
+	}
+
+	/**
+	 * Returns true, if this object is the default reference.
+	 */
+	public boolean isDefaultReference() {
+		return encodedReference == null;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return encodedReference == null ? 2059243550 : Arrays.hashCode(encodedReference);
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		return obj == this ||
+				obj != null && obj.getClass() == CheckpointStorageLocationReference.class &&
+						Arrays.equals(encodedReference, ((CheckpointStorageLocationReference) obj).encodedReference);
+	}
+
+	@Override
+	public String toString() {
+		return encodedReference == null ? "(default)"
+				: StringUtils.byteToHexString(encodedReference, 0, encodedReference.length);
+	}
+
+	/**
+	 * readResolve() preserves the singleton property of the default value.
+ 	 */
+	protected final Object readResolve() throws ObjectStreamException {
+		return encodedReference == null ? DEFAULT : this;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Default Location Reference
+	// ------------------------------------------------------------------------
+
+	/** The singleton object for the default reference. */
+	private static final CheckpointStorageLocationReference DEFAULT = new CheckpointStorageLocationReference();
+
+	public static CheckpointStorageLocationReference getDefault() {
+		return DEFAULT;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb19e7f5/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java
index 1254ee2..b344cfd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.FileUtils;
 
@@ -30,6 +31,7 @@ import javax.annotation.Nullable;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -55,6 +57,9 @@ public abstract class AbstractFsCheckpointStorage implements CheckpointStorage {
 	/** The name of the metadata files in checkpoints / savepoints. */
 	public static final String METADATA_FILE_NAME = "_metadata";
 
+	/** The magic number that is put in front of any reference. */
+	private static final byte[] REFERENCE_MAGIC_NUMBER = new byte[] { 0x05, 0x5F, 0x3F, 0x18 };
+
 	// ------------------------------------------------------------------------
 	//  Fields and properties
 	// ------------------------------------------------------------------------
@@ -147,7 +152,12 @@ public abstract class AbstractFsCheckpointStorage implements CheckpointStorage {
 
 			try {
 				if (fs.mkdirs(path)) {
-					return new FsCheckpointStorageLocation(fs, path, path, path);
+					// we make the path qualified, to make it independent of default schemes and authorities
+					final Path qp = path.makeQualified(fs);
+
+					final CheckpointStorageLocationReference reference = encodePathAsReference(qp);
+
+					return new FsCheckpointStorageLocation(fs, qp, qp, qp, reference);
 				}
 			} catch (Exception e) {
 				latestException = e;
@@ -253,4 +263,64 @@ public abstract class AbstractFsCheckpointStorage implements CheckpointStorage {
 
 		return new FileStateHandle(metadataFileStatus.getPath(), metadataFileStatus.getLen());
 	}
+
+	// ------------------------------------------------------------------------
+	//  Encoding / Decoding of References
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Encodes the given path as a reference in bytes. The path is encoded as a UTF-8 string
+	 * and prepended as a magic number.
+	 *
+	 * @param path The path to encode.
+	 * @return The location reference.
+	 */
+	public static CheckpointStorageLocationReference encodePathAsReference(Path path) {
+		byte[] refBytes = path.toString().getBytes(StandardCharsets.UTF_8);
+		byte[] bytes = new byte[REFERENCE_MAGIC_NUMBER.length + refBytes.length];
+
+		System.arraycopy(REFERENCE_MAGIC_NUMBER, 0, bytes, 0, REFERENCE_MAGIC_NUMBER.length);
+		System.arraycopy(refBytes, 0, bytes, REFERENCE_MAGIC_NUMBER.length, refBytes.length);
+
+		return new CheckpointStorageLocationReference(bytes);
+	}
+
+	/**
+	 * Decodes the given reference into a path. This method validates that the reference bytes start with
+	 * the correct magic number (as written by {@link #encodePathAsReference(Path)}) and converts
+	 * the remaining bytes back to a proper path.
+	 *
+	 * @param reference The bytes representing the reference.
+	 * @return The path decoded from the reference.
+	 *
+	 * @throws IllegalArgumentException Thrown, if the bytes do not represent a proper reference.
+	 */
+	public static Path decodePathFromReference(CheckpointStorageLocationReference reference) {
+		if (reference.isDefaultReference()) {
+			throw new IllegalArgumentException("Cannot decode default reference");
+		}
+
+		final byte[] bytes = reference.getReferenceBytes();
+		final int headerLen = REFERENCE_MAGIC_NUMBER.length;
+
+		if (bytes.length > headerLen) {
+			// compare magic number
+			for (int i = 0; i < headerLen; i++) {
+				if (bytes[i] != REFERENCE_MAGIC_NUMBER[i]) {
+					throw new IllegalArgumentException("Reference starts with the wrong magic number");
+				}
+			}
+
+			// covert to string and path
+			try {
+				return new Path(new String(bytes, headerLen, bytes.length - headerLen, StandardCharsets.UTF_8));
+			}
+			catch (Exception e) {
+				throw new IllegalArgumentException("Reference cannot be decoded to a path", e);
+			}
+		}
+		else {
+			throw new IllegalArgumentException("Reference too short.");
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bb19e7f5/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
index b7be8fa..08159d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 
 import javax.annotation.Nullable;
 
@@ -80,6 +81,7 @@ public class FsCheckpointStorage extends AbstractFsCheckpointStorage {
 		fileSystem.mkdirs(checkpointDir);
 
 		return new FsCheckpointStorageLocation(
-						fileSystem, checkpointDir, sharedStateDirectory, taskOwnedStateDirectory);
+						fileSystem, checkpointDir, sharedStateDirectory, taskOwnedStateDirectory,
+						CheckpointStorageLocationReference.getDefault());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bb19e7f5/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
index 829ab9a..9410287 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state.filesystem;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
 
 import java.io.IOException;
@@ -42,22 +43,22 @@ public class FsCheckpointStorageLocation implements CheckpointStorageLocation {
 
 	private final Path metadataFilePath;
 
-	private final String qualifiedCheckpointDirectory;
+	private final CheckpointStorageLocationReference reference;
 
 	public FsCheckpointStorageLocation(
 			FileSystem fileSystem,
 			Path checkpointDir,
 			Path sharedStateDir,
-			Path taskOwnedStateDir) {
+			Path taskOwnedStateDir,
+			CheckpointStorageLocationReference reference) {
 
 		this.fileSystem = checkNotNull(fileSystem);
 		this.checkpointDirectory = checkNotNull(checkpointDir);
 		this.sharedStateDirectory = checkNotNull(sharedStateDir);
 		this.taskOwnedStateDirectory = checkNotNull(taskOwnedStateDir);
+		this.reference = checkNotNull(reference);
 
 		this.metadataFilePath = new Path(checkpointDir, AbstractFsCheckpointStorage.METADATA_FILE_NAME);
-
-		this.qualifiedCheckpointDirectory = checkpointDir.makeQualified(fileSystem).toString();
 	}
 
 	// ------------------------------------------------------------------------
@@ -91,7 +92,7 @@ public class FsCheckpointStorageLocation implements CheckpointStorageLocation {
 
 	@Override
 	public String markCheckpointAsFinished() throws IOException {
-		return qualifiedCheckpointDirectory;
+		return checkpointDirectory.toString();
 	}
 
 	@Override
@@ -102,8 +103,8 @@ public class FsCheckpointStorageLocation implements CheckpointStorageLocation {
 	}
 
 	@Override
-	public String getLocationAsPointer() {
-		return qualifiedCheckpointDirectory;
+	public CheckpointStorageLocationReference getLocationReference() {
+		return reference;
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/bb19e7f5/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.java
index 3baa319..fb8bd7e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state.memory;
 
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.MemoryCheckpointOutputStream;
 
@@ -50,7 +51,7 @@ public class NonPersistentMetadataCheckpointStorageLocation implements Checkpoin
 	public void disposeOnFailure() {}
 
 	@Override
-	public String getLocationAsPointer() {
-		return PersistentMetadataCheckpointStorageLocation.LOCATION_POINTER;
+	public CheckpointStorageLocationReference getLocationReference() {
+		return CheckpointStorageLocationReference.getDefault();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bb19e7f5/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.java
index 5f4b954..b6a8635 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.memory;
 
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation;
 
 /**
@@ -33,10 +34,6 @@ import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation;
  */
 public class PersistentMetadataCheckpointStorageLocation extends FsCheckpointStorageLocation {
 
-	/** The internal pointer for the {@link MemoryStateBackend}'s storage location (data inline with
-	 * state handles) that gets sent to the TaskManagers to describe this storage. */
-	static final String LOCATION_POINTER = "(embedded)";
-
 	/**
 	 * Creates a checkpoint storage persists metadata to a file system and stores state
 	 * in line in state handles with the metadata.
@@ -45,14 +42,7 @@ public class PersistentMetadataCheckpointStorageLocation extends FsCheckpointSto
 	 * @param checkpointDir The directory where the checkpoint metadata will be written.
 	 */
 	public PersistentMetadataCheckpointStorageLocation(FileSystem fileSystem, Path checkpointDir) {
-		super(fileSystem, checkpointDir, checkpointDir, checkpointDir);
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public String getLocationAsPointer() {
-		return LOCATION_POINTER;
+		super(fileSystem, checkpointDir, checkpointDir, checkpointDir, CheckpointStorageLocationReference.getDefault());
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/bb19e7f5/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
index 43ff20d..5a37631 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
@@ -18,30 +18,45 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 
 import org.junit.Test;
 
+import java.util.Random;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link CheckpointOptions} class.
+ */
 public class CheckpointOptionsTest {
 
 	@Test
-	public void testFullCheckpoint() throws Exception {
-		CheckpointOptions options = CheckpointOptions.forCheckpointWithDefaultLocation();
+	public void testDefaultCheckpoint() throws Exception {
+		final CheckpointOptions options = CheckpointOptions.forCheckpointWithDefaultLocation();
 		assertEquals(CheckpointType.CHECKPOINT, options.getCheckpointType());
-		assertNull(options.getTargetLocation());
+		assertTrue(options.getTargetLocation().isDefaultReference());
+
+		final CheckpointOptions copy = CommonTestUtils.createCopySerializable(options);
+		assertEquals(CheckpointType.CHECKPOINT, copy.getCheckpointType());
+		assertTrue(copy.getTargetLocation().isDefaultReference());
 	}
 
 	@Test
 	public void testSavepoint() throws Exception {
-		String location = "asdasdadasdasdja7931481398123123123kjhasdkajsd";
-		CheckpointOptions options = CheckpointOptions.forSavepoint(location);
-		assertEquals(CheckpointType.SAVEPOINT, options.getCheckpointType());
-		assertEquals(location, options.getTargetLocation());
-	}
+		final Random rnd = new Random();
+		final byte[] locationBytes = new byte[rnd.nextInt(42)];
+		rnd.nextBytes(locationBytes);
+
+		final CheckpointOptions options = new CheckpointOptions(
+				CheckpointType.values()[rnd.nextInt(CheckpointType.values().length)],
+				new CheckpointStorageLocationReference(locationBytes));
 
-	@Test(expected = NullPointerException.class)
-	public void testSavepointNullCheck() throws Exception {
-		CheckpointOptions.forSavepoint(null);
+		final CheckpointOptions copy = CommonTestUtils.createCopySerializable(options);
+		assertEquals(options.getCheckpointType(), copy.getCheckpointType());
+		assertArrayEquals(locationBytes, copy.getTargetLocation().getReferenceBytes());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bb19e7f5/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 284a4b1..904b533 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation;
 
@@ -332,7 +333,9 @@ public class PendingCheckpointTest {
 
 		final Path checkpointDir = new Path(tmpFolder.newFolder().toURI());
 		final FsCheckpointStorageLocation location = new FsCheckpointStorageLocation(
-				LocalFileSystem.getSharedInstance(), checkpointDir, checkpointDir, checkpointDir);
+				LocalFileSystem.getSharedInstance(),
+				checkpointDir, checkpointDir, checkpointDir,
+				CheckpointStorageLocationReference.getDefault());
 
 		final Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(ACK_TASKS);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb19e7f5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index 0801eee..62d6aa5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -33,6 +34,8 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
+
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.junit.Test;
 
 public class EventSerializerTest {
@@ -45,7 +48,10 @@ public class EventSerializerTest {
 		CheckpointOptions checkpoint = CheckpointOptions.forCheckpointWithDefaultLocation();
 		testCheckpointBarrierSerialization(id, timestamp, checkpoint);
 
-		CheckpointOptions savepoint = CheckpointOptions.forSavepoint("1289031838919123");
+		final byte[] reference = new byte[] { 15, 52, 52, 11, 0, 0, 0, 0, -1, -23, -19, 35 };
+
+		CheckpointOptions savepoint = new CheckpointOptions(
+				CheckpointType.SAVEPOINT, new CheckpointStorageLocationReference(reference));
 		testCheckpointBarrierSerialization(id, timestamp, savepoint);
 	}
 
@@ -148,7 +154,7 @@ public class EventSerializerTest {
 	 * @throws IOException
 	 */
 	private boolean checkIsEvent(
-			AbstractEvent event, 
+			AbstractEvent event,
 			Class<? extends AbstractEvent> eventClass) throws IOException {
 
 		final Buffer serializedEvent = EventSerializer.toBuffer(event);

http://git-wip-us.apache.org/repos/asf/flink/blob/bb19e7f5/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStorageLocationReferenceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStorageLocationReferenceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStorageLocationReferenceTest.java
new file mode 100644
index 0000000..b0bad64
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStorageLocationReferenceTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Random;
+
+import static org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.decodePathFromReference;
+import static org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.encodePathAsReference;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the encoding / decoding of storage location references.
+ */
+public class FsStorageLocationReferenceTest extends TestLogger {
+
+	@Test
+	public void testEncodeAndDecode() throws Exception {
+		final Path path = randomPath(new Random());
+
+		try {
+			CheckpointStorageLocationReference ref = encodePathAsReference(path);
+			Path decoded = decodePathFromReference(ref);
+
+			assertEquals(path, decoded);
+		}
+		catch (Exception | Error e) {
+			// if something goes wrong, help by printing the problematic path
+			log.error("ERROR FOR PATH " + path);
+			throw e;
+		}
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testDecodingTooShortReference() {
+		decodePathFromReference(new CheckpointStorageLocationReference(new byte[2]));
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testDecodingGarbage() {
+		final byte[] bytes = new byte[] { 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C };
+		decodePathFromReference(new CheckpointStorageLocationReference(bytes));
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testDecodingDefaultReference() {
+		decodePathFromReference(CheckpointStorageLocationReference.getDefault());
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static Path randomPath(Random rnd) {
+		final StringBuilder path = new StringBuilder();
+
+		// scheme
+		path.append(StringUtils.getRandomString(rnd, 1, 5, 'a', 'z'));
+		path.append("://");
+		path.append(StringUtils.getRandomString(rnd, 10, 20)); // authority
+		path.append(rnd.nextInt(50000) + 1); // port
+
+		for (int i = rnd.nextInt(5) + 1; i > 0; i--) {
+			path.append('/');
+			path.append(StringUtils.getRandomString(rnd, 3, 15));
+		}
+
+		return new Path(path.toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb19e7f5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index be7e784..c6bc144 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -74,6 +74,7 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
 import java.util.ConcurrentModificationException;
 import java.util.HashMap;
 import java.util.Map;
@@ -468,7 +469,13 @@ public abstract class AbstractStreamOperator<OUT>
 		if (checkpointType == CheckpointType.CHECKPOINT) {
 			return checkpointStreamFactory;
 		} else if (checkpointType == CheckpointType.SAVEPOINT) {
-			return container.createSavepointStreamFactory(this, checkpointOptions.getTargetLocation());
+
+			// temporary fix: hard-code back conversion of the location reference to a string
+			String targetAsString = new String(
+					checkpointOptions.getTargetLocation().getReferenceBytes(),
+					StandardCharsets.UTF_8);
+
+			return container.createSavepointStreamFactory(this, targetAsString);
 		} else {
 			throw new IllegalStateException("Unknown checkpoint type " + checkpointType);
 		}


Mime
View raw message