flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject [03/10] flink git commit: [FLINK-1594] [streaming] Embedded StreamEdges
Date Fri, 20 Mar 2015 12:41:35 GMT
[FLINK-1594] [streaming] Embedded StreamEdges


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a8ba72b1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a8ba72b1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a8ba72b1

Branch: refs/heads/master
Commit: a8ba72b165dfd2b769783d5612b08559cbf24bf9
Parents: 29a6615
Author: Gábor Hermann <reckoner42@gmail.com>
Authored: Thu Feb 26 16:15:17 2015 +0100
Committer: Gyula Fora <gyfora@apache.org>
Committed: Fri Mar 20 11:25:03 2015 +0100

----------------------------------------------------------------------
 .../apache/flink/streaming/api/StreamEdge.java  |   2 +-
 .../flink/streaming/api/StreamEdgeList.java     |  48 ++++++-
 .../apache/flink/streaming/api/StreamGraph.java | 125 ++++++++-----------
 .../api/StreamingJobGraphGenerator.java         |  20 +--
 .../flink/streaming/api/WindowingOptimizer.java |  14 +--
 5 files changed, 109 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a8ba72b1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
index 8743233..479ae93 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
@@ -55,7 +55,7 @@ public class StreamEdge {
 		return selectedNames;
 	}
 
-	public StreamPartitioner<?> getOutputPartitioner() {
+	public StreamPartitioner<?> getPartitioner() {
 		return outputPartitioner;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a8ba72b1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdgeList.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdgeList.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdgeList.java
index 85202ab..d15116b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdgeList.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdgeList.java
@@ -90,23 +90,59 @@ public class StreamEdgeList {
 		}
 	}
 
-	public List<StreamEdge> getOutEdges(int i) {
-		List<StreamEdge> outEdges = outEdgeLists.get(i);
+	public StreamEdge getEdge(int sourceId, int targetId) {
+		Iterator<StreamEdge> outIterator = outEdgeLists.get(sourceId).iterator();
+		while (outIterator.hasNext()) {
+			StreamEdge edge = outIterator.next();
+
+			if (edge.getTargetVertex() == targetId) {
+				return edge;
+			}
+		}
+
+		throw new RuntimeException("No such edge in stream graph: " + sourceId + " -> " + targetId);
+	}
+
+	public List<StreamEdge> getOutEdges(int vertexId) {
+		List<StreamEdge> outEdges = outEdgeLists.get(vertexId);
 
 		if (outEdges == null) {
-			throw new RuntimeException("No such vertex in stream graph: " + i);
+			throw new RuntimeException("No such vertex in stream graph: " + vertexId);
 		}
 
 		return outEdges;
 	}
 
-	public List<StreamEdge> getInEdges(int i) {
-		List<StreamEdge> inEdges = inEdgeLists.get(i);
+	public List<StreamEdge> getInEdges(int vertexId) {
+		List<StreamEdge> inEdges = inEdgeLists.get(vertexId);
 
 		if (inEdges == null) {
-			throw new RuntimeException("No such vertex in stream graph: " + i);
+			throw new RuntimeException("No such vertex in stream graph: " + vertexId);
 		}
 
 		return inEdges;
 	}
+
+	public List<Integer> getOutEdgeIndices(int vertexId) {
+		List<StreamEdge> outEdges = getOutEdges(vertexId);
+		List<Integer> outEdgeIndices = new ArrayList<Integer>();
+
+		for (StreamEdge edge : outEdges) {
+			outEdgeIndices.add(edge.getTargetVertex());
+		}
+
+		return outEdgeIndices;
+	}
+
+	public List<Integer> getInEdgeIndices(int vertexId) {
+		List<StreamEdge> inEdges = getInEdges(vertexId);
+
+		List<Integer> inEdgeIndices = new ArrayList<Integer>();
+
+		for (StreamEdge edge : inEdges) {
+			inEdgeIndices.add(edge.getSourceVertex());
+		}
+
+		return inEdgeIndices;
+	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a8ba72b1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index 9f00c8e..dfe66a5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -66,11 +66,11 @@ public class StreamGraph extends StreamingPlan {
 	// Graph attributes
 	private Map<Integer, Integer> operatorParallelisms;
 	private Map<Integer, Long> bufferTimeouts;
-	private Map<Integer, List<Integer>> outEdgeLists;
-	private Map<Integer, List<Integer>> outEdgeTypes;
-	private Map<Integer, List<List<String>>> selectedNames;
-	private Map<Integer, List<Integer>> inEdgeLists;
-	private Map<Integer, List<StreamPartitioner<?>>> outputPartitioners;
+
+	private StreamEdgeList edges;
+
+	private Map<Integer, List<OutputSelector<?>>> outputSelectors;
+
 	private Map<Integer, String> operatorNames;
 	private Map<Integer, StreamInvokable<?, ?>> invokableObjects;
 	private Map<Integer, StreamRecordSerializer<?>> typeSerializersIn1;
@@ -78,7 +78,6 @@ public class StreamGraph extends StreamingPlan {
 	private Map<Integer, StreamRecordSerializer<?>> typeSerializersOut1;
 	private Map<Integer, StreamRecordSerializer<?>> typeSerializersOut2;
 	private Map<Integer, Class<? extends AbstractInvokable>> jobVertexClasses;
-	private Map<Integer, List<OutputSelector<?>>> outputSelectors;
 	private Map<Integer, Integer> iterationIds;
 	private Map<Integer, Integer> iterationIDtoHeadID;
 	private Map<Integer, Integer> iterationIDtoTailID;
@@ -112,17 +111,9 @@ public class StreamGraph extends StreamingPlan {
 		operatorParallelisms = new HashMap<Integer, Integer>();
 		containingMaps.add(operatorParallelisms);
 		bufferTimeouts = new HashMap<Integer, Long>();
-		containingMaps.add(bufferTimeouts);
-		outEdgeLists = new HashMap<Integer, List<Integer>>();
-		containingMaps.add(outEdgeLists);
-		outEdgeTypes = new HashMap<Integer, List<Integer>>();
-		containingMaps.add(outEdgeTypes);
-		selectedNames = new HashMap<Integer, List<List<String>>>();
-		containingMaps.add(selectedNames);
-		inEdgeLists = new HashMap<Integer, List<Integer>>();
-		containingMaps.add(inEdgeLists);
-		outputPartitioners = new HashMap<Integer, List<StreamPartitioner<?>>>();
-		containingMaps.add(outputPartitioners);
+
+		edges = new StreamEdgeList();
+
 		operatorNames = new HashMap<Integer, String>();
 		containingMaps.add(operatorNames);
 		invokableObjects = new HashMap<Integer, StreamInvokable<?, ?>>();
@@ -221,9 +212,10 @@ public class StreamGraph extends StreamingPlan {
 
 		setSerializersFrom(iterationHead, vertexID);
 
-		setEdge(vertexID, iterationHead,
-				outputPartitioners.get(inEdgeLists.get(iterationHead).get(0)).get(0), 0,
-				new ArrayList<String>());
+		int outpartitionerIndexToCopy = edges.getInEdgeIndices(iterationHead).get(0);
+		StreamPartitioner<?> outputPartitioner = edges.getOutEdges(outpartitionerIndexToCopy).get(0).getPartitioner();
+
+		setEdge(vertexID, iterationHead, outputPartitioner, 0, new ArrayList<String>());
 
 		iterationTimeouts.put(iterationIDtoHeadID.get(iterationID), waitTime);
 
@@ -290,7 +282,7 @@ public class StreamGraph extends StreamingPlan {
 
 	/**
 	 * Sets vertex parameters in the JobGraph
-	 * 
+	 *
 	 * @param vertexID
 	 *            Name of the vertex
 	 * @param vertexClass
@@ -307,12 +299,10 @@ public class StreamGraph extends StreamingPlan {
 		setParallelism(vertexID, parallelism);
 		invokableObjects.put(vertexID, invokableObject);
 		operatorNames.put(vertexID, operatorName);
-		outEdgeLists.put(vertexID, new ArrayList<Integer>());
-		outEdgeTypes.put(vertexID, new ArrayList<Integer>());
-		selectedNames.put(vertexID, new ArrayList<List<String>>());
+
+		edges.addVertex(vertexID);
 		outputSelectors.put(vertexID, new ArrayList<OutputSelector<?>>());
-		inEdgeLists.put(vertexID, new ArrayList<Integer>());
-		outputPartitioners.put(vertexID, new ArrayList<StreamPartitioner<?>>());
+
 		iterationTailCount.put(vertexID, 0);
 	}
 
@@ -333,40 +323,21 @@ public class StreamGraph extends StreamingPlan {
 	 */
 	public void setEdge(Integer upStreamVertexID, Integer downStreamVertexID,
 			StreamPartitioner<?> partitionerObject, int typeNumber, List<String> outputNames)
{
-		outEdgeLists.get(upStreamVertexID).add(downStreamVertexID);
-		outEdgeTypes.get(upStreamVertexID).add(typeNumber);
-		inEdgeLists.get(downStreamVertexID).add(upStreamVertexID);
-		outputPartitioners.get(upStreamVertexID).add(partitionerObject);
-		selectedNames.get(upStreamVertexID).add(outputNames);
+
+		StreamEdge edge = new StreamEdge(upStreamVertexID, downStreamVertexID, typeNumber, outputNames,
partitionerObject);
+		edges.addEdge(edge);
 	}
 
 	public void removeEdge(Integer upStream, Integer downStream) {
-		int inputIndex = getInEdges(downStream).indexOf(upStream);
-		inEdgeLists.get(downStream).remove(inputIndex);
-
-		int outputIndex = getOutEdges(upStream).indexOf(downStream);
-		outEdgeLists.get(upStream).remove(outputIndex);
-		outEdgeTypes.get(upStream).remove(outputIndex);
-		selectedNames.get(upStream).remove(outputIndex);
-		outputPartitioners.get(upStream).remove(outputIndex);
+		edges.removeEdge(upStream, downStream);
 	}
 
 	public void removeVertex(Integer toRemove) {
-		List<Integer> outEdges = new ArrayList<Integer>(getOutEdges(toRemove));
-		List<Integer> inEdges = new ArrayList<Integer>(getInEdges(toRemove));
-
-		for (Integer output : outEdges) {
-			removeEdge(toRemove, output);
-		}
-
-		for (Integer input : inEdges) {
-			removeEdge(input, toRemove);
-		}
+		edges.removeVertex(toRemove);
 
 		for (Map<Integer, ?> map : containingMaps) {
 			map.remove(toRemove);
 		}
-
 	}
 
 	private void addTypeSerializers(Integer vertexID, StreamRecordSerializer<?> in1,
@@ -418,11 +389,11 @@ public class StreamGraph extends StreamingPlan {
 	/**
 	 * Sets a user defined {@link OutputSelector} for the given operator. Used
 	 * for directed emits.
-	 * 
+	 *
 	 * @param vertexID
-	 *            Name of the vertex for which the output selector will be set
+	 * 		Name of the vertex for which the output selector will be set
 	 * @param outputSelector
-	 *            The user defined output selector.
+	 * 		The user defined output selector.
 	 */
 	public <T> void setOutputSelector(Integer vertexID, OutputSelector<T> outputSelector)
{
 		outputSelectors.get(vertexID).add(outputSelector);
@@ -470,11 +441,11 @@ public class StreamGraph extends StreamingPlan {
 	/**
 	 * Sets TypeSerializerWrapper from one vertex to another, used with some
 	 * sinks.
-	 * 
+	 *
 	 * @param from
-	 *            from
+	 * 		from
 	 * @param to
-	 *            to
+	 * 		to
 	 */
 	public void setSerializersFrom(Integer from, Integer to) {
 		operatorNames.put(to, operatorNames.get(from));
@@ -495,9 +466,9 @@ public class StreamGraph extends StreamingPlan {
 	/**
 	 * Gets the assembled {@link JobGraph} and adds a user specified name for
 	 * it.
-	 * 
+	 *
 	 * @param jobGraphName
-	 *            name of the jobGraph
+	 * 		name of the jobGraph
 	 */
 	public JobGraph getJobGraph(String jobGraphName) {
 
@@ -526,28 +497,24 @@ public class StreamGraph extends StreamingPlan {
 		return sources;
 	}
 
-	public List<Integer> getOutEdges(Integer vertexID) {
-		return outEdgeLists.get(vertexID);
+	public StreamEdge getEdge(Integer sourceId, Integer targetId) {
+		return edges.getEdge(sourceId, targetId);
 	}
 
-	public List<Integer> getInEdges(Integer vertexID) {
-		return inEdgeLists.get(vertexID);
+	public List<StreamEdge> getOutEdges(Integer vertexID) {
+		return edges.getOutEdges(vertexID);
 	}
 
-	public List<Integer> getOutEdgeTypes(Integer vertexID) {
-
-		return outEdgeTypes.get(vertexID);
+	public List<StreamEdge> getInEdges(Integer vertexID) {
+		return edges.getInEdges(vertexID);
 	}
 
-	public StreamPartitioner<?> getOutPartitioner(Integer upStreamVertex, Integer downStreamVertex)
{
-		return outputPartitioners.get(upStreamVertex).get(
-				outEdgeLists.get(upStreamVertex).indexOf(downStreamVertex));
+	public List<Integer> getOutEdgeIndices(Integer vertexID) {
+		return edges.getOutEdgeIndices(vertexID);
 	}
 
-	public List<String> getSelectedNames(Integer upStreamVertex, Integer downStreamVertex)
{
-
-		return selectedNames.get(upStreamVertex).get(
-				outEdgeLists.get(upStreamVertex).indexOf(downStreamVertex));
+	public List<Integer> getInEdgeIndices(Integer vertexID) {
+		return edges.getInEdgeIndices(vertexID);
 	}
 
 	public Collection<Integer> getIterationIDs() {
@@ -668,7 +635,9 @@ public class StreamGraph extends StreamingPlan {
 					JSONArray inputs = new JSONArray();
 					node.put(PREDECESSORS, inputs);
 
-					for (int inputID : getInEdges(vertexID)) {
+					for (StreamEdge inEdge : getInEdges(vertexID)) {
+						int inputID = inEdge.getSourceVertex();
+
 						Integer mappedID = (edgeRemapings.keySet().contains(inputID)) ?
 								edgeRemapings.get(inputID) : inputID;
 						decorateEdge(inputs, vertexID, mappedID, inputID);
@@ -678,7 +647,9 @@ public class StreamGraph extends StreamingPlan {
 				toVisit.remove(vertexID);
 			} else {
 				Integer iterationHead = -1;
-				for (int operator : getInEdges(vertexID)) {
+				for (StreamEdge inEdge : getInEdges(vertexID)) {
+					int operator = inEdge.getSourceVertex();
+
 					if (iterationIds.keySet().contains(operator)) {
 						iterationHead = operator;
 					}
@@ -718,7 +689,9 @@ public class StreamGraph extends StreamingPlan {
 				JSONArray inEdges = new JSONArray();
 				obj.put(PREDECESSORS, inEdges);
 
-				for (int inputID : getInEdges(vertexID)) {
+				for (StreamEdge inEdge : getInEdges(vertexID)) {
+					int inputID = inEdge.getSourceVertex();
+
 					if (edgeRemapings.keySet().contains(inputID)) {
 						decorateEdge(inEdges, vertexID, inputID, inputID);
 					} else if (!iterationIds.containsKey(inputID)) {
@@ -737,7 +710,7 @@ public class StreamGraph extends StreamingPlan {
 			JSONObject input = new JSONObject();
 			inputArray.put(input);
 			input.put(ID, mappedInputID);
-			input.put(SHIP_STRATEGY, getOutPartitioner(inputID, vertexID).getStrategy());
+			input.put(SHIP_STRATEGY, edges.getEdge(inputID, vertexID).getPartitioner().getStrategy());
 			input.put(SIDE, (inputArray.length() == 0) ? "first" : "second");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a8ba72b1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index ecb6455..607d041 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -104,11 +104,12 @@ public class StreamingJobGraphGenerator {
 			List<Integer> chainableOutputs = new ArrayList<Integer>();
 			List<Integer> nonChainableOutputs = new ArrayList<Integer>();
 
-			for (Integer outName : streamGraph.getOutEdges(current)) {
-				if (isChainable(current, outName)) {
-					chainableOutputs.add(outName);
+			for (StreamEdge outEdge : streamGraph.getOutEdges(current)) {
+				Integer outID = outEdge.getTargetVertex();
+				if (isChainable(current, outID)) {
+					chainableOutputs.add(outID);
 				} else {
-					nonChainableOutputs.add(outName);
+					nonChainableOutputs.add(outID);
 				}
 			}
 
@@ -230,7 +231,7 @@ public class StreamingJobGraphGenerator {
 		allOutputs.addAll(nonChainableOutputs);
 
 		for (Integer output : allOutputs) {
-			config.setSelectedNames(output, streamGraph.getSelectedNames(vertexID, output));
+			config.setSelectedNames(output, streamGraph.getEdge(vertexID, output).getSelectedNames());
 		}
 
 		vertexConfigs.put(vertexID, config);
@@ -251,14 +252,13 @@ public class StreamingJobGraphGenerator {
 				headVertex.getConfiguration()) : chainedConfigs.get(headOfChain).get(
 				upStreamvertexID);
 
-		List<Integer> outEdgeIndexList = streamGraph.getOutEdgeTypes(upStreamvertexID);
+//		List<Integer> outEdgeIndexList = streamGraph.getOutEdgeTypes(upStreamvertexID);
 		int numOfInputs = downStreamConfig.getNumberOfInputs();
 
-		downStreamConfig.setInputIndex(numOfInputs++, outEdgeIndexList.get(outputIndex));
+		downStreamConfig.setInputIndex(numOfInputs++, streamGraph.getEdge(upStreamvertexID, downStreamvertexID).getTypeNumber());
 		downStreamConfig.setNumberOfInputs(numOfInputs);
 
-		StreamPartitioner<?> partitioner = streamGraph.getOutPartitioner(upStreamvertexID,
-				downStreamvertexID);
+		StreamPartitioner<?> partitioner = streamGraph.getEdge(upStreamvertexID, downStreamvertexID).getPartitioner();
 
 		upStreamConfig.setPartitioner(downStreamvertexID, partitioner);
 
@@ -284,7 +284,7 @@ public class StreamingJobGraphGenerator {
 				&& outInvokable.getChainingStrategy() == ChainingStrategy.ALWAYS
 				&& (headInvokable.getChainingStrategy() == ChainingStrategy.HEAD || headInvokable
 						.getChainingStrategy() == ChainingStrategy.ALWAYS)
-				&& streamGraph.getOutPartitioner(vertexID, outName).getStrategy() == PartitioningStrategy.FORWARD
+				&& streamGraph.getEdge(vertexID, outName).getPartitioner().getStrategy() == PartitioningStrategy.FORWARD
 				&& streamGraph.getParallelism(vertexID) == streamGraph.getParallelism(outName)
 				&& streamGraph.chaining;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a8ba72b1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/WindowingOptimizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/WindowingOptimizer.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/WindowingOptimizer.java
index e2cbc4b..3e98bda 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/WindowingOptimizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/WindowingOptimizer.java
@@ -54,13 +54,13 @@ public class WindowingOptimizer {
 
 		for (Integer flattener : flatteners) {
 			// Flatteners should have exactly one input
-			Integer input = streamGraph.getInEdges(flattener).get(0);
+			Integer input = streamGraph.getInEdges(flattener).get(0).getSourceVertex();
 
 			// Check whether the flatten is applied after a merge
 			if (streamGraph.getInvokable(input) instanceof WindowMerger) {
 
 				// Mergers should have exactly one input
-				Integer mergeInput = streamGraph.getInEdges(input).get(0);
+				Integer mergeInput = streamGraph.getInEdges(input).get(0).getSourceVertex();
 				streamGraph.setEdge(mergeInput, flattener, new DistributePartitioner(true), 0,
 						new ArrayList<String>());
 
@@ -97,9 +97,9 @@ public class WindowingOptimizer {
 			boolean inMatching = false;
 			for (Tuple2<StreamDiscretizer<?>, List<Integer>> matching : matchingDiscretizers)
{
 				Set<Integer> discretizerInEdges = new HashSet<Integer>(
-						streamGraph.getInEdges(discretizer.f0));
+						streamGraph.getInEdgeIndices(discretizer.f0));
 				Set<Integer> matchingInEdges = new HashSet<Integer>(
-						streamGraph.getInEdges(matching.f1.get(0)));
+						streamGraph.getInEdgeIndices(matching.f1.get(0)));
 
 				if (discretizer.f1.equals(matching.f0)
 						&& discretizerInEdges.equals(matchingInEdges)) {
@@ -130,7 +130,7 @@ public class WindowingOptimizer {
 	private static void replaceDiscretizer(StreamGraph streamGraph, Integer toReplace,
 			Integer replaceWith) {
 		// Convert to array to create a copy
-		List<Integer> outEdges = new ArrayList<Integer>(streamGraph.getOutEdges(toReplace));
+		List<Integer> outEdges = new ArrayList<Integer>(streamGraph.getOutEdgeIndices(toReplace));
 
 		int numOutputs = outEdges.size();
 
@@ -139,11 +139,11 @@ public class WindowingOptimizer {
 			Integer output = outEdges.get(i);
 
 			streamGraph.setEdge(replaceWith, output,
-					streamGraph.getOutPartitioner(toReplace, output), 0, new ArrayList<String>());
+					streamGraph.getEdge(toReplace, output).getPartitioner(), 0, new ArrayList<String>());
 			streamGraph.removeEdge(toReplace, output);
 		}
 
-		List<Integer> inEdges = new ArrayList<Integer>(streamGraph.getInEdges(toReplace));
+		List<Integer> inEdges = new ArrayList<Integer>(streamGraph.getInEdgeIndices(toReplace));
 		// Remove inputs
 		for (Integer input : inEdges) {
 			streamGraph.removeEdge(input, toReplace);


Mime
View raw message