flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-5290] Ensure backwards compatibility of the hashes used to generate JobVertexIds
Date Wed, 14 Dec 2016 11:20:30 GMT
Repository: flink
Updated Branches:
  refs/heads/master 8a7288ea9 -> 6cfc841b5


[FLINK-5290] Ensure backwards compatibility of the hashes used to generate JobVertexIds


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6cfc841b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6cfc841b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6cfc841b

Branch: refs/heads/master
Commit: 6cfc841b5f5829593cf2993bf3493f9ec42657e6
Parents: 8a7288e
Author: Stefan Richter <s.richter@data-artisans.com>
Authored: Thu Dec 8 14:03:47 2016 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Wed Dec 14 10:53:57 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/StateAssignmentOperation.java    |  14 +-
 .../checkpoint/savepoint/SavepointLoader.java   |  12 +
 .../executiongraph/ExecutionJobVertex.java      |  25 ++
 .../runtime/jobgraph/InputFormatVertex.java     |   6 +
 .../flink/runtime/jobgraph/JobVertex.java       |  24 ++
 .../executiongraph/LegacyJobVertexIdTest.java   |  77 +++++
 .../api/graph/StreamGraphHasherV1.java          | 293 +++++++++++++++++
 .../streaming/api/graph/StreamGraphHasher.java  |  36 +++
 .../api/graph/StreamGraphHasherV2.java          | 315 +++++++++++++++++++
 .../flink/streaming/api/graph/StreamNode.java   |  10 +-
 .../api/graph/StreamingJobGraphGenerator.java   | 302 +++---------------
 11 files changed, 841 insertions(+), 273 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6cfc841b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index f496a07..61a71e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -62,10 +62,22 @@ public class StateAssignmentOperation {
 
 	public boolean assignStates() throws Exception {
 
+		boolean expandedToLegacyIds = false;
+		Map<JobVertexID, ExecutionJobVertex> localTasks = this.tasks;
+
 		for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : latest.getTaskStates().entrySet()) {
 
 			TaskState taskState = taskGroupStateEntry.getValue();
-			ExecutionJobVertex executionJobVertex = tasks.get(taskGroupStateEntry.getKey());
+			ExecutionJobVertex executionJobVertex = localTasks.get(taskGroupStateEntry.getKey());
+
+			// on the first time we can not find the execution job vertex for an id, we also consider alternative ids,
+			// for example as generated from older flink versions, to provide backwards compatibility.
+			if (executionJobVertex == null && !expandedToLegacyIds) {
+				localTasks = ExecutionJobVertex.includeLegacyJobVertexIDs(localTasks);
+				executionJobVertex = localTasks.get(taskGroupStateEntry.getKey());
+				expandedToLegacyIds = true;
+				logger.info("Could not find ExecutionJobVertex. Including legacy JobVertexIDs in search.");
+			}
 
 			if (executionJobVertex == null) {
 				if (allowNonRestoredState) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfc841b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
index 172e425..d6be482 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
@@ -64,10 +64,22 @@ public class SavepointLoader {
 		Savepoint savepoint = SavepointStore.loadSavepoint(savepointPath, userClassLoader);
 		final Map<JobVertexID, TaskState> taskStates = new HashMap<>(savepoint.getTaskStates().size());
 
+		boolean expandedToLegacyIds = false;
+
 		// (2) validate it (parallelism, etc)
 		for (TaskState taskState : savepoint.getTaskStates()) {
+
 			ExecutionJobVertex executionJobVertex = tasks.get(taskState.getJobVertexID());
 
+			// on the first time we can not find the execution job vertex for an id, we also consider alternative ids,
+			// for example as generated from older flink versions, to provide backwards compatibility.
+			if (executionJobVertex == null && !expandedToLegacyIds) {
+				tasks = ExecutionJobVertex.includeLegacyJobVertexIDs(tasks);
+				executionJobVertex = tasks.get(taskState.getJobVertexID());
+				expandedToLegacyIds = true;
+				LOG.info("Could not find ExecutionJobVertex. Including legacy JobVertexIDs in search.");
+			}
+
 			if (executionJobVertex != null) {
 				if (executionJobVertex.getMaxParallelism() == taskState.getMaxParallelism()) {
 					taskStates.put(taskState.getJobVertexID(), taskState);

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfc841b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index c066ca8..7f2545c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -617,6 +617,31 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		}
 	}
 
+	public static Map<JobVertexID, ExecutionJobVertex> includeLegacyJobVertexIDs(
+			Map<JobVertexID, ExecutionJobVertex> tasks) {
+
+		Map<JobVertexID, ExecutionJobVertex> expanded = new HashMap<>(2 * tasks.size());
+		// first include all new ids
+		expanded.putAll(tasks);
+
+		// now expand and add legacy ids
+		for (ExecutionJobVertex executionJobVertex : tasks.values()) {
+			if (null != executionJobVertex) {
+				JobVertex jobVertex = executionJobVertex.getJobVertex();
+				if (null != jobVertex) {
+					List<JobVertexID> alternativeIds = jobVertex.getIdAlternatives();
+					for (JobVertexID jobVertexID : alternativeIds) {
+						ExecutionJobVertex old = expanded.put(jobVertexID, executionJobVertex);
+						Preconditions.checkState(null == old || old.equals(executionJobVertex),
+								"Ambiguous jobvertex id detected during expansion to legacy ids.");
+					}
+				}
+			}
+		}
+
+		return expanded;
+	}
+
 	@Override
 	public ArchivedExecutionJobVertex archive() {
 		return new ArchivedExecutionJobVertex(this);

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfc841b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
index 781108c..c4fc907 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 
+import java.util.List;
+
 public class InputFormatVertex extends JobVertex {
 
 	private static final long serialVersionUID = 1L;
@@ -36,6 +38,10 @@ public class InputFormatVertex extends JobVertex {
 	public InputFormatVertex(String name, JobVertexID id) {
 		super(name, id);
 	}
+
+	public InputFormatVertex(String name, JobVertexID id, List<JobVertexID> alternativeIds) {
+		super(name, id, alternativeIds);
+	}
 	
 	
 	public void setFormatDescription(String formatDescription) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfc841b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index 2bda9d8..d24100e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -48,6 +48,8 @@ public class JobVertex implements java.io.Serializable {
 	/** The ID of the vertex. */
 	private final JobVertexID id;
 
+	private final ArrayList<JobVertexID> idAlternatives = new ArrayList<>();
+
 	/** List of produced data sets, one per writer */
 	private final ArrayList<IntermediateDataSet> results = new ArrayList<IntermediateDataSet>();
 
@@ -117,6 +119,19 @@ public class JobVertex implements java.io.Serializable {
 		this.id = id == null ? new JobVertexID() : id;
 	}
 
+	/**
+	 * Constructs a new job vertex and assigns it with the given name.
+	 *
+	 * @param name The name of the new job vertex.
+	 * @param primaryId The id of the job vertex.
+	 * @param alternativeIds The alternative ids of the job vertex.
+	 */
+	public JobVertex(String name, JobVertexID primaryId, List<JobVertexID> alternativeIds) {
+		this.name = name == null ? DEFAULT_NAME : name;
+		this.id = primaryId == null ? new JobVertexID() : primaryId;
+		this.idAlternatives.addAll(alternativeIds);
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	/**
@@ -129,6 +144,15 @@ public class JobVertex implements java.io.Serializable {
 	}
 
 	/**
+	 * Returns a list of all alternative IDs of this job vertex.
+	 *
+	 * @return List of all alternative IDs for this job vertex
+	 */
+	public List<JobVertexID> getIdAlternatives() {
+		return idAlternatives;
+	}
+
+	/**
 	 * Returns the name of the vertex.
 	 * 
 	 * @return The name of the vertex.

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfc841b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
new file mode 100644
index 0000000..44dc0a4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.util.SerializedValue;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+import static org.mockito.Mockito.mock;
+
+public class LegacyJobVertexIdTest {
+
+	@Test
+	public void testIntroduceLegacyJobVertexIds() throws Exception {
+		JobVertexID defaultId = new JobVertexID();
+		JobVertexID legacyId1 = new JobVertexID();
+		JobVertexID legacyId2 = new JobVertexID();
+
+		JobVertex jobVertex = new JobVertex("test", defaultId, Arrays.asList(legacyId1, legacyId2));
+		jobVertex.setInvokableClass(AbstractInvokable.class);
+
+		ExecutionGraph executionGraph = new ExecutionGraph(
+				mock(Executor.class),
+				mock(Executor.class),
+				new JobID(),
+				"test",
+				mock(Configuration.class),
+				mock(SerializedValue.class),
+				Time.seconds(1),
+				mock(RestartStrategy.class));
+
+		ExecutionJobVertex executionJobVertex =
+				new ExecutionJobVertex(executionGraph, jobVertex, 1, Time.seconds(1));
+
+		Map<JobVertexID, ExecutionJobVertex> idToVertex = new HashMap<>();
+		idToVertex.put(executionJobVertex.getJobVertexId(), executionJobVertex);
+
+		Assert.assertEquals(executionJobVertex, idToVertex.get(defaultId));
+		Assert.assertNull(idToVertex.get(legacyId1));
+		Assert.assertNull(idToVertex.get(legacyId2));
+
+		idToVertex = ExecutionJobVertex.includeLegacyJobVertexIDs(idToVertex);
+
+		Assert.assertEquals(3, idToVertex.size());
+		Assert.assertEquals(executionJobVertex, idToVertex.get(defaultId));
+		Assert.assertEquals(executionJobVertex, idToVertex.get(legacyId1));
+		Assert.assertEquals(executionJobVertex, idToVertex.get(legacyId2));
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfc841b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
new file mode 100644
index 0000000..dec0c18
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.streaming.api.graph;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamGraphHasher;
+import org.apache.flink.streaming.api.graph.StreamNode;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.flink.util.StringUtils.byteToHexString;
+
+/**
+ * StreamGraphHasher from Flink 1.1. This contains duplicated code to ensure that the algorithm does not change with
+ * future Flink versions.
+ *
+ * <p>DO NOT MODIFY THIS CLASS
+ */
+public class StreamGraphHasherV1 implements StreamGraphHasher {
+
+	private static final Logger LOG = LoggerFactory.getLogger(StreamGraphHasherV1.class);
+
+	@Override
+	public Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
+		// The hash function used to generate the hash
+		final HashFunction hashFunction = Hashing.murmur3_128(0);
+		final Map<Integer, byte[]> hashes = new HashMap<>();
+
+		Set<Integer> visited = new HashSet<>();
+		Queue<StreamNode> remaining = new ArrayDeque<>();
+
+		// We need to make the source order deterministic. The source IDs are
+		// not returned in the same order, which means that submitting the same
+		// program twice might result in different traversal, which breaks the
+		// deterministic hash assignment.
+		List<Integer> sources = new ArrayList<>();
+		for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
+			sources.add(sourceNodeId);
+		}
+		Collections.sort(sources);
+
+		//
+		// Traverse the graph in a breadth-first manner. Keep in mind that
+		// the graph is not a tree and multiple paths to nodes can exist.
+		//
+
+		// Start with source nodes
+		for (Integer sourceNodeId : sources) {
+			remaining.add(streamGraph.getStreamNode(sourceNodeId));
+			visited.add(sourceNodeId);
+		}
+
+		StreamNode currentNode;
+		while ((currentNode = remaining.poll()) != null) {
+			// Generate the hash code. Because multiple path exist to each
+			// node, we might not have all required inputs available to
+			// generate the hash code.
+			if (generateNodeHash(currentNode, hashFunction, hashes, streamGraph.isChainingEnabled())) {
+				// Add the child nodes
+				for (StreamEdge outEdge : currentNode.getOutEdges()) {
+					StreamNode child = outEdge.getTargetVertex();
+
+					if (!visited.contains(child.getId())) {
+						remaining.add(child);
+						visited.add(child.getId());
+					}
+				}
+			} else {
+				// We will revisit this later.
+				visited.remove(currentNode.getId());
+			}
+		}
+
+		return hashes;
+	}
+
+	/**
+	 * Generates a hash for the node and returns whether the operation was
+	 * successful.
+	 *
+	 * @param node         The node to generate the hash for
+	 * @param hashFunction The hash function to use
+	 * @param hashes       The current state of generated hashes
+	 * @return <code>true</code> if the node hash has been generated.
+	 * <code>false</code>, otherwise. If the operation is not successful, the
+	 * hash needs be generated at a later point when all input is available.
+	 * @throws IllegalStateException If node has user-specified hash and is
+	 *                               intermediate node of a chain
+	 */
+	private boolean generateNodeHash(
+			StreamNode node,
+			HashFunction hashFunction,
+			Map<Integer, byte[]> hashes,
+			boolean isChainingEnabled) {
+
+		// Check for user-specified ID
+		String userSpecifiedHash = node.getTransformationId();
+
+		if (userSpecifiedHash == null) {
+			// Check that all input nodes have their hashes computed
+			for (StreamEdge inEdge : node.getInEdges()) {
+				// If the input node has not been visited yet, the current
+				// node will be visited again at a later point when all input
+				// nodes have been visited and their hashes set.
+				if (!hashes.containsKey(inEdge.getSourceId())) {
+					return false;
+				}
+			}
+
+			Hasher hasher = hashFunction.newHasher();
+			byte[] hash = generateDeterministicHash(node, hasher, hashes, isChainingEnabled);
+
+			if (hashes.put(node.getId(), hash) != null) {
+				// Sanity check
+				throw new IllegalStateException("Unexpected state. Tried to add node hash " +
+						"twice. This is probably a bug in the JobGraph generator.");
+			}
+
+			return true;
+		} else {
+			// Check that this node is not part of a chain. This is currently
+			// not supported, because the runtime takes the snapshots by the
+			// operator ID of the first vertex in a chain. It's OK if the node
+			// has chained outputs.
+			for (StreamEdge inEdge : node.getInEdges()) {
+				if (isChainable(inEdge, isChainingEnabled)) {
+					throw new UnsupportedOperationException("Cannot assign user-specified hash "
+							+ "to intermediate node in chain. This will be supported in future "
+							+ "versions of Flink. As a work around start new chain at task "
+							+ node.getOperatorName() + ".");
+				}
+			}
+
+			Hasher hasher = hashFunction.newHasher();
+			byte[] hash = generateUserSpecifiedHash(node, hasher);
+
+			for (byte[] previousHash : hashes.values()) {
+				if (Arrays.equals(previousHash, hash)) {
+					throw new IllegalArgumentException("Hash collision on user-specified ID. " +
+							"Most likely cause is a non-unique ID. Please check that all IDs " +
+							"specified via `uid(String)` are unique.");
+				}
+			}
+
+			if (hashes.put(node.getId(), hash) != null) {
+				// Sanity check
+				throw new IllegalStateException("Unexpected state. Tried to add node hash " +
+						"twice. This is probably a bug in the JobGraph generator.");
+			}
+
+			return true;
+		}
+	}
+
+	/**
+	 * Generates a hash from a user-specified ID.
+	 */
+	private byte[] generateUserSpecifiedHash(StreamNode node, Hasher hasher) {
+		hasher.putString(node.getTransformationId(), Charset.forName("UTF-8"));
+
+		return hasher.hash().asBytes();
+	}
+
+	/**
+	 * Generates a deterministic hash from node-local properties and input and
+	 * output edges.
+	 */
+	private byte[] generateDeterministicHash(
+			StreamNode node,
+			Hasher hasher,
+			Map<Integer, byte[]> hashes,
+			boolean isChainingEnabled) {
+
+		// Include stream node to hash. We use the current size of the computed
+		// hashes as the ID. We cannot use the node's ID, because it is
+		// assigned from a static counter. This will result in two identical
+		// programs having different hashes.
+		generateNodeLocalHash(node, hasher, hashes.size());
+
+		// Include chained nodes to hash
+		for (StreamEdge outEdge : node.getOutEdges()) {
+			if (isChainable(outEdge, isChainingEnabled)) {
+				StreamNode chainedNode = outEdge.getTargetVertex();
+
+				// Use the hash size again, because the nodes are chained to
+				// this node. This does not add a hash for the chained nodes.
+				generateNodeLocalHash(chainedNode, hasher, hashes.size());
+			}
+		}
+
+		byte[] hash = hasher.hash().asBytes();
+
+		// Make sure that all input nodes have their hash set before entering
+		// this loop (calling this method).
+		for (StreamEdge inEdge : node.getInEdges()) {
+			byte[] otherHash = hashes.get(inEdge.getSourceId());
+
+			// Sanity check
+			if (otherHash == null) {
+				throw new IllegalStateException("Missing hash for input node "
+						+ inEdge.getSourceVertex() + ". Cannot generate hash for "
+						+ node + ".");
+			}
+
+			for (int j = 0; j < hash.length; j++) {
+				hash[j] = (byte) (hash[j] * 37 ^ otherHash[j]);
+			}
+		}
+
+		if (LOG.isDebugEnabled()) {
+			String udfClassName = "";
+			if (node.getOperator() instanceof AbstractUdfStreamOperator) {
+				udfClassName = ((AbstractUdfStreamOperator<?, ?>) node.getOperator())
+						.getUserFunction().getClass().getName();
+			}
+
+			LOG.debug("Generated hash '" + byteToHexString(hash) + "' for node " +
+					"'" + node.toString() + "' {id: " + node.getId() + ", " +
+					"parallelism: " + node.getParallelism() + ", " +
+					"user function: " + udfClassName + "}");
+		}
+
+		return hash;
+	}
+
+	private boolean isChainable(StreamEdge edge, boolean isChainingEnabled) {
+		StreamNode upStreamVertex = edge.getSourceVertex();
+		StreamNode downStreamVertex = edge.getTargetVertex();
+
+		StreamOperator<?> headOperator = upStreamVertex.getOperator();
+		StreamOperator<?> outOperator = downStreamVertex.getOperator();
+
+		return downStreamVertex.getInEdges().size() == 1
+				&& outOperator != null
+				&& headOperator != null
+				&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
+				&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
+				&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
+				headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
+				&& (edge.getPartitioner() instanceof ForwardPartitioner)
+				&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
+				&& isChainingEnabled;
+	}
+
+	private void generateNodeLocalHash(StreamNode node, Hasher hasher, int id) {
+		hasher.putInt(id);
+
+		hasher.putInt(node.getParallelism());
+
+		if (node.getOperator() instanceof AbstractUdfStreamOperator) {
+			String udfClassName = ((AbstractUdfStreamOperator<?, ?>) node.getOperator())
+					.getUserFunction().getClass().getName();
+
+			hasher.putString(udfClassName, Charset.forName("UTF-8"));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfc841b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasher.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasher.java
new file mode 100644
index 0000000..866fd1f
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasher.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.Map;
+
+/**
+ * Interface for different implementations of generating hashes over a stream graph.
+ */
+public interface StreamGraphHasher {
+
+	/**
+	 * Returns a map with a hash for each {@link StreamNode} of the {@link
+	 * StreamGraph}. The hash is used as the {@link JobVertexID} in order to
+	 * identify nodes across job submissions if they didn't change.
+	 */
+	Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfc841b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
new file mode 100644
index 0000000..75b606a
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.flink.util.StringUtils.byteToHexString;
+
+/**
+ * StreamGraphHasher from Flink 1.2. This contains duplicated code to ensure that the algorithm does not change with
+ * future Flink versions.
+ *
+ * <p>DO NOT MODIFY THIS CLASS
+ */
+public class StreamGraphHasherV2 implements StreamGraphHasher {
+
+	private static final Logger LOG = LoggerFactory.getLogger(StreamGraphHasherV2.class);
+
+	/**
+	 * Returns a map with a hash for each {@link StreamNode} of the {@link
+	 * StreamGraph}. The hash is used as the {@link JobVertexID} in order to
+	 * identify nodes across job submissions if they didn't change.
+	 *
+	 * <p>
+	 * <p>The complete {@link StreamGraph} is traversed. The hash is either
+	 * computed from the transformation's user-specified id (see
+	 * {@link StreamTransformation#getUid()}) or generated in a deterministic way.
+	 *
+	 * <p>
+	 * <p>The generated hash is deterministic with respect to:
+	 * <ul>
+	 * <li>node-local properties (like parallelism, UDF, node ID),
+	 * <li>chained output nodes, and
+	 * <li>input nodes hashes
+	 * </ul>
+	 *
+	 * @return A map from {@link StreamNode#id} to hash as 16-byte array.
+	 */
+	@Override
+	public Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
+		// The hash function used to generate the hash
+		final HashFunction hashFunction = Hashing.murmur3_128(0);
+		final Map<Integer, byte[]> hashes = new HashMap<>();
+
+		Set<Integer> visited = new HashSet<>();
+		Queue<StreamNode> remaining = new ArrayDeque<>();
+
+		// We need to make the source order deterministic. The source IDs are
+		// not returned in the same order, which means that submitting the same
+		// program twice might result in different traversal, which breaks the
+		// deterministic hash assignment.
+		List<Integer> sources = new ArrayList<>();
+		for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
+			sources.add(sourceNodeId);
+		}
+		Collections.sort(sources);
+
+		//
+		// Traverse the graph in a breadth-first manner. Keep in mind that
+		// the graph is not a tree and multiple paths to nodes can exist.
+		//
+
+		// Start with source nodes
+		for (Integer sourceNodeId : sources) {
+			remaining.add(streamGraph.getStreamNode(sourceNodeId));
+			visited.add(sourceNodeId);
+		}
+
+		StreamNode currentNode;
+		while ((currentNode = remaining.poll()) != null) {
+			// Generate the hash code. Because multiple path exist to each
+			// node, we might not have all required inputs available to
+			// generate the hash code.
+			if (generateNodeHash(currentNode, hashFunction, hashes, streamGraph.isChainingEnabled())) {
+				// Add the child nodes
+				for (StreamEdge outEdge : currentNode.getOutEdges()) {
+					StreamNode child = outEdge.getTargetVertex();
+
+					if (!visited.contains(child.getId())) {
+						remaining.add(child);
+						visited.add(child.getId());
+					}
+				}
+			} else {
+				// We will revisit this later.
+				visited.remove(currentNode.getId());
+			}
+		}
+
+		return hashes;
+	}
+
+	/**
+	 * Generates a hash for the node and returns whether the operation was
+	 * successful.
+	 *
+	 * @param node         The node to generate the hash for
+	 * @param hashFunction The hash function to use
+	 * @param hashes       The current state of generated hashes
+	 * @return <code>true</code> if the node hash has been generated.
+	 * <code>false</code>, otherwise. If the operation is not successful, the
+	 * hash needs be generated at a later point when all input is available.
+	 * @throws IllegalStateException If node has user-specified hash and is
+	 *                               intermediate node of a chain
+	 */
+	private boolean generateNodeHash(
+			StreamNode node,
+			HashFunction hashFunction,
+			Map<Integer, byte[]> hashes,
+			boolean isChainingEnabled) {
+
+		// Check for user-specified ID
+		String userSpecifiedHash = node.getTransformationId();
+
+		if (userSpecifiedHash == null) {
+			// Check that all input nodes have their hashes computed
+			for (StreamEdge inEdge : node.getInEdges()) {
+				// If the input node has not been visited yet, the current
+				// node will be visited again at a later point when all input
+				// nodes have been visited and their hashes set.
+				if (!hashes.containsKey(inEdge.getSourceId())) {
+					return false;
+				}
+			}
+
+			Hasher hasher = hashFunction.newHasher();
+			byte[] hash = generateDeterministicHash(node, hasher, hashes, isChainingEnabled);
+
+			if (hashes.put(node.getId(), hash) != null) {
+				// Sanity check
+				throw new IllegalStateException("Unexpected state. Tried to add node hash " +
+						"twice. This is probably a bug in the JobGraph generator.");
+			}
+
+			return true;
+		} else {
+			// Check that this node is not part of a chain. This is currently
+			// not supported, because the runtime takes the snapshots by the
+			// operator ID of the first vertex in a chain. It's OK if the node
+			// has chained outputs.
+			for (StreamEdge inEdge : node.getInEdges()) {
+				if (isChainable(inEdge, isChainingEnabled)) {
+					throw new UnsupportedOperationException("Cannot assign user-specified hash "
+							+ "to intermediate node in chain. This will be supported in future "
+							+ "versions of Flink. As a work around start new chain at task "
+							+ node.getOperatorName() + ".");
+				}
+			}
+
+			Hasher hasher = hashFunction.newHasher();
+			byte[] hash = generateUserSpecifiedHash(node, hasher);
+
+			for (byte[] previousHash : hashes.values()) {
+				if (Arrays.equals(previousHash, hash)) {
+					throw new IllegalArgumentException("Hash collision on user-specified ID. " +
+							"Most likely cause is a non-unique ID. Please check that all IDs " +
+							"specified via `uid(String)` are unique.");
+				}
+			}
+
+			if (hashes.put(node.getId(), hash) != null) {
+				// Sanity check
+				throw new IllegalStateException("Unexpected state. Tried to add node hash " +
+						"twice. This is probably a bug in the JobGraph generator.");
+			}
+
+			return true;
+		}
+	}
+
+	/**
+	 * Generates a hash from a user-specified ID.
+	 */
+	private byte[] generateUserSpecifiedHash(StreamNode node, Hasher hasher) {
+		hasher.putString(node.getTransformationId(), Charset.forName("UTF-8"));
+
+		return hasher.hash().asBytes();
+	}
+
+	/**
+	 * Generates a deterministic hash from node-local properties and input and
+	 * output edges.
+	 */
+	private byte[] generateDeterministicHash(
+			StreamNode node,
+			Hasher hasher,
+			Map<Integer, byte[]> hashes,
+			boolean isChainingEnabled) {
+
+		// Include stream node to hash. We use the current size of the computed
+		// hashes as the ID. We cannot use the node's ID, because it is
+		// assigned from a static counter. This will result in two identical
+		// programs having different hashes.
+		generateNodeLocalHash(node, hasher, hashes.size());
+
+		// Include chained nodes to hash
+		for (StreamEdge outEdge : node.getOutEdges()) {
+			if (isChainable(outEdge, isChainingEnabled)) {
+				StreamNode chainedNode = outEdge.getTargetVertex();
+
+				// Use the hash size again, because the nodes are chained to
+				// this node. This does not add a hash for the chained nodes.
+				generateNodeLocalHash(chainedNode, hasher, hashes.size());
+			}
+		}
+
+		byte[] hash = hasher.hash().asBytes();
+
+		// Make sure that all input nodes have their hash set before entering
+		// this loop (calling this method).
+		for (StreamEdge inEdge : node.getInEdges()) {
+			byte[] otherHash = hashes.get(inEdge.getSourceId());
+
+			// Sanity check
+			if (otherHash == null) {
+				throw new IllegalStateException("Missing hash for input node "
+						+ inEdge.getSourceVertex() + ". Cannot generate hash for "
+						+ node + ".");
+			}
+
+			for (int j = 0; j < hash.length; j++) {
+				hash[j] = (byte) (hash[j] * 37 ^ otherHash[j]);
+			}
+		}
+
+		if (LOG.isDebugEnabled()) {
+			String udfClassName = "";
+			if (node.getOperator() instanceof AbstractUdfStreamOperator) {
+				udfClassName = ((AbstractUdfStreamOperator<?, ?>) node.getOperator())
+						.getUserFunction().getClass().getName();
+			}
+
+			LOG.debug("Generated hash '" + byteToHexString(hash) + "' for node " +
+					"'" + node.toString() + "' {id: " + node.getId() + ", " +
+					"parallelism: " + node.getParallelism() + ", " +
+					"user function: " + udfClassName + "}");
+		}
+
+		return hash;
+	}
+
+	/**
+	 * Applies the {@link Hasher} to the {@link StreamNode} (only node local
+	 * attributes are taken into account). The hasher encapsulates the current
+	 * state of the hash.
+	 *
+	 * <p>
+	 * <p>The specified ID is local to this node. We cannot use the
+	 * {@link StreamNode#id}, because it is incremented in a static counter.
+	 * Therefore, the IDs for identical jobs will otherwise be different.
+	 */
+	private void generateNodeLocalHash(StreamNode node, Hasher hasher, int id) {
+		// This resolves conflicts for otherwise identical source nodes. BUT
+		// the generated hash codes depend on the ordering of the nodes in the
+		// stream graph.
+		hasher.putInt(id);
+	}
+
+	private boolean isChainable(StreamEdge edge, boolean isChainingEnabled) {
+		StreamNode upStreamVertex = edge.getSourceVertex();
+		StreamNode downStreamVertex = edge.getTargetVertex();
+
+		StreamOperator<?> headOperator = upStreamVertex.getOperator();
+		StreamOperator<?> outOperator = downStreamVertex.getOperator();
+
+		return downStreamVertex.getInEdges().size() == 1
+				&& outOperator != null
+				&& headOperator != null
+				&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
+				&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
+				&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
+				headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
+				&& (edge.getPartitioner() instanceof ForwardPartitioner)
+				&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
+				&& isChainingEnabled;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfc841b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 9051891..7085eeb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -17,10 +17,6 @@
 
 package org.apache.flink.streaming.api.graph;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.InputFormat;
@@ -32,6 +28,10 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.util.Preconditions;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * Class representing the operators in the streaming programs, with all their properties.
  */
@@ -272,7 +272,7 @@ public class StreamNode implements Serializable {
 		this.stateKeySerializer = stateKeySerializer;
 	}
 
-	String getTransformationId() {
+	public String getTransformationId() {
 		return transformationId;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6cfc841b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 48be2e9..da69b49 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -17,15 +17,13 @@
 
 package org.apache.flink.streaming.api.graph;
 
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.migration.streaming.api.graph.StreamGraphHasherV1;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.InputFormatVertex;
@@ -42,10 +40,8 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -54,10 +50,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.charset.Charset;
-import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -65,10 +58,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Queue;
-import java.util.Set;
-
-import static org.apache.flink.util.StringUtils.byteToHexString;
 
 @Internal
 public class StreamingJobGraphGenerator {
@@ -94,8 +83,13 @@ public class StreamingJobGraphGenerator {
 	private Map<Integer, StreamConfig> vertexConfigs;
 	private Map<Integer, String> chainedNames;
 
+	private final StreamGraphHasher defaultStreamGraphHasher;
+	private final List<StreamGraphHasher> legacyStreamGraphHashers;
+
 	public StreamingJobGraphGenerator(StreamGraph streamGraph) {
 		this.streamGraph = streamGraph;
+		this.defaultStreamGraphHasher = new StreamGraphHasherV2();
+		this.legacyStreamGraphHashers = Collections.<StreamGraphHasher>singletonList(new StreamGraphHasherV1());
 	}
 
 	private void init() {
@@ -118,9 +112,15 @@ public class StreamingJobGraphGenerator {
 
 		// Generate deterministic hashes for the nodes in order to identify them across
 		// submission iff they didn't change.
-		Map<Integer, byte[]> hashes = traverseStreamGraphAndGenerateHashes();
+		Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
+
+		// Generate legacy version hashes for backwards compatibility
+		List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
+		for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
+			legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
+		}
 
-		setChaining(hashes);
+		setChaining(hashes, legacyHashes);
 
 		setPhysicalEdges();
 
@@ -164,9 +164,9 @@ public class StreamingJobGraphGenerator {
 	 *
 	 * <p>This will recursively create all {@link JobVertex} instances.
 	 */
-	private void setChaining(Map<Integer, byte[]> hashes) {
+	private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
 		for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
-			createChain(sourceNodeId, sourceNodeId, hashes, 0);
+			createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0);
 		}
 	}
 
@@ -174,6 +174,7 @@ public class StreamingJobGraphGenerator {
 			Integer startNodeId,
 			Integer currentNodeId,
 			Map<Integer, byte[]> hashes,
+			List<Map<Integer, byte[]>> legacyHashes,
 			int chainIndex) {
 
 		if (!builtVertices.contains(startNodeId)) {
@@ -192,18 +193,19 @@ public class StreamingJobGraphGenerator {
 			}
 
 			for (StreamEdge chainable : chainableOutputs) {
-				transitiveOutEdges.addAll(createChain(startNodeId, chainable.getTargetId(), hashes, chainIndex + 1));
+				transitiveOutEdges.addAll(
+						createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1));
 			}
 
 			for (StreamEdge nonChainable : nonChainableOutputs) {
 				transitiveOutEdges.add(nonChainable);
-				createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, 0);
+				createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0);
 			}
 
 			chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
 
 			StreamConfig config = currentNodeId.equals(startNodeId)
-					? createJobVertex(startNodeId, hashes)
+					? createJobVertex(startNodeId, hashes, legacyHashes)
 					: new StreamConfig(new Configuration());
 
 			setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
@@ -261,7 +263,8 @@ public class StreamingJobGraphGenerator {
 
 	private StreamConfig createJobVertex(
 			Integer streamNodeId,
-			Map<Integer, byte[]> hashes) {
+			Map<Integer, byte[]> hashes,
+			List<Map<Integer, byte[]>> legacyHashes) {
 
 		JobVertex jobVertex;
 		StreamNode streamNode = streamGraph.getStreamNode(streamNodeId);
@@ -275,16 +278,26 @@ public class StreamingJobGraphGenerator {
 
 		JobVertexID jobVertexId = new JobVertexID(hash);
 
+		List<JobVertexID> legacyJobVertexIds = new ArrayList<>(legacyHashes.size());
+		for (Map<Integer, byte[]> legacyHash : legacyHashes) {
+			hash = legacyHash.get(streamNodeId);
+			if (null != hash) {
+				legacyJobVertexIds.add(new JobVertexID(hash));
+			}
+		}
+
 		if (streamNode.getInputFormat() != null) {
 			jobVertex = new InputFormatVertex(
 					chainedNames.get(streamNodeId),
-					jobVertexId);
+					jobVertexId,
+					legacyJobVertexIds);
 			TaskConfig taskConfig = new TaskConfig(jobVertex.getConfiguration());
 			taskConfig.setStubWrapper(new UserCodeObjectWrapper<Object>(streamNode.getInputFormat()));
 		} else {
 			jobVertex = new JobVertex(
 					chainedNames.get(streamNodeId),
-					jobVertexId);
+					jobVertexId,
+					legacyJobVertexIds);
 		}
 
 		jobVertex.setInvokableClass(streamNode.getJobVertexClass());
@@ -518,249 +531,4 @@ public class StreamingJobGraphGenerator {
 				externalizedCheckpointSettings);
 		jobGraph.setSnapshotSettings(settings);
 	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Returns a map with a hash for each {@link StreamNode} of the {@link
-	 * StreamGraph}. The hash is used as the {@link JobVertexID} in order to
-	 * identify nodes across job submissions if they didn't change.
-	 *
-	 * <p>The complete {@link StreamGraph} is traversed. The hash is either
-	 * computed from the transformation's user-specified id (see
-	 * {@link StreamTransformation#getUid()}) or generated in a deterministic way.
-	 *
-	 * <p>The generated hash is deterministic with respect to:
-	 * <ul>
-	 * <li>node-local properties (like parallelism, UDF, node ID),
-	 * <li>chained output nodes, and
-	 * <li>input nodes hashes
-	 * </ul>
-	 *
-	 * @return A map from {@link StreamNode#id} to hash as 16-byte array.
-	 */
-	private Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes() {
-		// The hash function used to generate the hash
-		final HashFunction hashFunction = Hashing.murmur3_128(0);
-		final Map<Integer, byte[]> hashes = new HashMap<>();
-
-		Set<Integer> visited = new HashSet<>();
-		Queue<StreamNode> remaining = new ArrayDeque<>();
-
-		// We need to make the source order deterministic. The source IDs are
-		// not returned in the same order, which means that submitting the same
-		// program twice might result in different traversal, which breaks the
-		// deterministic hash assignment.
-		List<Integer> sources = new ArrayList<>();
-		for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
-			sources.add(sourceNodeId);
-		}
-		Collections.sort(sources);
-
-		//
-		// Traverse the graph in a breadth-first manner. Keep in mind that
-		// the graph is not a tree and multiple paths to nodes can exist.
-		//
-
-		// Start with source nodes
-		for (Integer sourceNodeId : sources) {
-			remaining.add(streamGraph.getStreamNode(sourceNodeId));
-			visited.add(sourceNodeId);
-		}
-
-		StreamNode currentNode;
-		while ((currentNode = remaining.poll()) != null) {
-			// Generate the hash code. Because multiple path exist to each
-			// node, we might not have all required inputs available to
-			// generate the hash code.
-			if (generateNodeHash(currentNode, hashFunction, hashes)) {
-				// Add the child nodes
-				for (StreamEdge outEdge : currentNode.getOutEdges()) {
-					StreamNode child = outEdge.getTargetVertex();
-
-					if (!visited.contains(child.getId())) {
-						remaining.add(child);
-						visited.add(child.getId());
-					}
-				}
-			}
-			else {
-				// We will revisit this later.
-				visited.remove(currentNode.getId());
-			}
-		}
-
-		return hashes;
-	}
-
-	/**
-	 * Generates a hash for the node and returns whether the operation was
-	 * successful.
-	 *
-	 * @param node         The node to generate the hash for
-	 * @param hashFunction The hash function to use
-	 * @param hashes       The current state of generated hashes
-	 * @return <code>true</code> if the node hash has been generated.
-	 * <code>false</code>, otherwise. If the operation is not successful, the
-	 * hash needs be generated at a later point when all input is available.
-	 * @throws IllegalStateException If node has user-specified hash and is
-	 *                               intermediate node of a chain
-	 */
-	private boolean generateNodeHash(
-			StreamNode node,
-			HashFunction hashFunction,
-			Map<Integer, byte[]> hashes) {
-
-		// Check for user-specified ID
-		String userSpecifiedHash = node.getTransformationId();
-
-		if (userSpecifiedHash == null) {
-			// Check that all input nodes have their hashes computed
-			for (StreamEdge inEdge : node.getInEdges()) {
-				// If the input node has not been visited yet, the current
-				// node will be visited again at a later point when all input
-				// nodes have been visited and their hashes set.
-				if (!hashes.containsKey(inEdge.getSourceId())) {
-					return false;
-				}
-			}
-
-			Hasher hasher = hashFunction.newHasher();
-			byte[] hash = generateDeterministicHash(node, hasher, hashes);
-
-			if (hashes.put(node.getId(), hash) != null) {
-				// Sanity check
-				throw new IllegalStateException("Unexpected state. Tried to add node hash " +
-						"twice. This is probably a bug in the JobGraph generator.");
-			}
-
-			return true;
-		}
-		else {
-			// Check that this node is not part of a chain. This is currently
-			// not supported, because the runtime takes the snapshots by the
-			// operator ID of the first vertex in a chain. It's OK if the node
-			// has chained outputs.
-			for (StreamEdge inEdge : node.getInEdges()) {
-				if (isChainable(inEdge)) {
-					throw new UnsupportedOperationException("Cannot assign user-specified hash "
-							+ "to intermediate node in chain. This will be supported in future "
-							+ "versions of Flink. As a work around start new chain at task "
-							+ node.getOperatorName() + ".");
-				}
-			}
-
-			Hasher hasher = hashFunction.newHasher();
-			byte[] hash = generateUserSpecifiedHash(node, hasher);
-
-			for (byte[] previousHash : hashes.values()) {
-				if (Arrays.equals(previousHash, hash)) {
-					throw new IllegalArgumentException("Hash collision on user-specified ID. " +
-							"Most likely cause is a non-unique ID. Please check that all IDs " +
-							"specified via `uid(String)` are unique.");
-				}
-			}
-
-			if (hashes.put(node.getId(), hash) != null) {
-				// Sanity check
-				throw new IllegalStateException("Unexpected state. Tried to add node hash " +
-						"twice. This is probably a bug in the JobGraph generator.");
-			}
-
-			return true;
-		}
-	}
-
-	/**
-	 * Generates a hash from a user-specified ID.
-	 */
-	private byte[] generateUserSpecifiedHash(StreamNode node, Hasher hasher) {
-		hasher.putString(node.getTransformationId(), Charset.forName("UTF-8"));
-
-		return hasher.hash().asBytes();
-	}
-
-	/**
-	 * Generates a deterministic hash from node-local properties and input and
-	 * output edges.
-	 */
-	private byte[] generateDeterministicHash(
-			StreamNode node,
-			Hasher hasher,
-			Map<Integer, byte[]> hashes) {
-
-		// Include stream node to hash. We use the current size of the computed
-		// hashes as the ID. We cannot use the node's ID, because it is
-		// assigned from a static counter. This will result in two identical
-		// programs having different hashes.
-		generateNodeLocalHash(node, hasher, hashes.size());
-
-		// Include chained nodes to hash
-		for (StreamEdge outEdge : node.getOutEdges()) {
-			if (isChainable(outEdge)) {
-				StreamNode chainedNode = outEdge.getTargetVertex();
-
-				// Use the hash size again, because the nodes are chained to
-				// this node. This does not add a hash for the chained nodes.
-				generateNodeLocalHash(chainedNode, hasher, hashes.size());
-			}
-		}
-
-		byte[] hash = hasher.hash().asBytes();
-
-		// Make sure that all input nodes have their hash set before entering
-		// this loop (calling this method).
-		for (StreamEdge inEdge : node.getInEdges()) {
-			byte[] otherHash = hashes.get(inEdge.getSourceId());
-
-			// Sanity check
-			if (otherHash == null) {
-				throw new IllegalStateException("Missing hash for input node "
-						+ inEdge.getSourceVertex() + ". Cannot generate hash for "
-						+ node + ".");
-			}
-
-			for (int j = 0; j < hash.length; j++) {
-				hash[j] = (byte) (hash[j] * 37 ^ otherHash[j]);
-			}
-		}
-
-		if (LOG.isDebugEnabled()) {
-			String udfClassName = "";
-			if (node.getOperator() instanceof AbstractUdfStreamOperator) {
-				udfClassName = ((AbstractUdfStreamOperator<?, ?>) node.getOperator())
-						.getUserFunction().getClass().getName();
-			}
-
-			LOG.debug("Generated hash '" + byteToHexString(hash) + "' for node " +
-					"'" + node.toString() + "' {id: " + node.getId() + ", " +
-					"parallelism: " + node.getParallelism() + ", " +
-					"user function: " + udfClassName + "}");
-		}
-
-		return hash;
-	}
-
-	/**
-	 * Applies the {@link Hasher} to the {@link StreamNode} (only node local
-	 * attributes are taken into account). The hasher encapsulates the current
-	 * state of the hash.
-	 *
-	 * <p>The specified ID is local to this node. We cannot use the
-	 * {@link StreamNode#id}, because it is incremented in a static counter.
-	 * Therefore, the IDs for identical jobs will otherwise be different.
-	 */
-	private void generateNodeLocalHash(StreamNode node, Hasher hasher, int id) {
-		// This resolves conflicts for otherwise identical source nodes. BUT
-		// the generated hash codes depend on the ordering of the nodes in the
-		// stream graph.
-		hasher.putInt(id);
-
-		if (node.getOperator() instanceof AbstractUdfStreamOperator) {
-			String udfClassName = ((AbstractUdfStreamOperator<?, ?>) node.getOperator())
-					.getUserFunction().getClass().getName();
-
-			hasher.putString(udfClassName, Charset.forName("UTF-8"));
-		}
-	}
 }


Mime
View raw message