flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject [1/2] flink git commit: [FLINK-1586] [streaming] Add support for iterative streaming graphs on JSON generation Closes #432
Date Sat, 28 Feb 2015 17:32:27 GMT
Repository: flink
Updated Branches:
  refs/heads/master 3dc2fe1dc -> 9ee8f4254


[FLINK-1586] [streaming] Add support for iterative streaming graphs on JSON generation
Closes #432


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e91c1da3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e91c1da3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e91c1da3

Branch: refs/heads/master
Commit: e91c1da3dfa0144527a893d9d2073132c5f4697c
Parents: 3dc2fe1
Author: Paris Carbone <seniorcarbone@gmail.com>
Authored: Sun Feb 22 21:30:53 2015 +0100
Committer: Gyula Fora <gyfora@apache.org>
Committed: Sat Feb 28 16:30:09 2015 +0100

----------------------------------------------------------------------
 .../apache/flink/streaming/api/StreamGraph.java | 185 +++++++++++++------
 1 file changed, 132 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e91c1da3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index 82cd954..0f5ea54 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -616,81 +616,160 @@ public class StreamGraph extends StreamingPlan {
 		WindowingOptimzier.optimizeGraph(this);
 
 		try {
+			return new JSONGenerator().getJSON();
+		} catch (JSONException e) {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("JSON plan creation failed: {}", e);
+			}
+			return "";
+		}
+
+	}
+
+	@Override
+	public void dumpStreamingPlanAsJSON(File file) throws IOException {
+		PrintWriter pw = null;
+		try {
+			pw = new PrintWriter(new FileOutputStream(file), false);
+			pw.write(getStreamingPlanAsJSON());
+			pw.flush();
+
+		} finally {
+			if (pw != null) {
+				pw.close();
+			}
+		}
+	}
+
+	private class JSONGenerator {
+
+		public static final String STEPS = "step_function";
+		public static final String ID = "id";
+		public static final String SIDE = "side";
+		public static final String SHIP_STRATEGY = "ship_strategy";
+		public static final String PREDECESSORS = "predecessors";
+		public static final String TYPE = "type";
+		public static final String PACT = "pact";
+		public static final String CONTENTS = "contents";
+		public static final String PARALLELISM = "parallelism";
+
+		public String getJSON() throws JSONException {
 			JSONObject json = new JSONObject();
 			JSONArray nodes = new JSONArray();
-
 			json.put("nodes", nodes);
 			List<Integer> operatorIDs = new ArrayList<Integer>(operatorNames.keySet());
 			Collections.sort(operatorIDs);
+			visit(nodes, operatorIDs, new HashMap<Integer, Integer>());
+			return json.toString();
+		}
 
-			for (Integer id : operatorIDs) {
-				JSONObject node = new JSONObject();
-				nodes.put(node);
-
-				node.put("id", id);
-				node.put("type", getOperatorName(id));
-
-				if (sources.contains(id)) {
-					node.put("pact", "Data Source");
-				} else {
-					node.put("pact", "Data Stream");
-				}
-
-				if (getInvokable(id) != null && getInvokable(id).getUserFunction() != null) {
-					node.put("contents", getOperatorName(id) + " at "
-							+ getInvokable(id).getUserFunction().getClass().getSimpleName());
-				} else {
-					node.put("contents", getOperatorName(id));
-				}
+		private void visit(JSONArray jsonArray, List<Integer> toVisit,
+			Map<Integer, Integer> edgeRemapings) throws JSONException {
 
-				node.put("parallelism", getParallelism(id));
+			Integer vertexID = toVisit.get(0);
+			if (getSources().contains(vertexID) || Collections.disjoint(getInEdges(vertexID), toVisit))
{
 
-				int numIn = getInEdges(id).size();
-				if (numIn > 0) {
+				JSONObject node = new JSONObject();
+				decorateNode(vertexID, node);
 
+				if (!getSources().contains(vertexID)) {
 					JSONArray inputs = new JSONArray();
-					node.put("predecessors", inputs);
+					node.put(PREDECESSORS, inputs);
 
-					for (int i = 0; i < numIn; i++) {
+					for (int inputID : getInEdges(vertexID)) {
+						Integer mappedID = (edgeRemapings.keySet().contains(inputID)) ?
+								edgeRemapings.get(inputID) : inputID;
+						decorateEdge(inputs, vertexID, mappedID, inputID);
+					}
+				}
+				jsonArray.put(node);
+				toVisit.remove(vertexID);
+			} else {
+				Integer iterationHead = -1;
+				for (int operator : getInEdges(vertexID)) {
+					if (iterationIds.keySet().contains(operator)) {
+						iterationHead = operator;
+					}
+				}
 
-						Integer inID = getInEdges(id).get(i);
+				JSONObject obj = new JSONObject();
+				JSONArray iterationSteps = new JSONArray();
+				obj.put(STEPS, iterationSteps);
+				obj.put(ID, iterationHead);
+				obj.put(PACT, "IterativeDataStream");
+				obj.put(PARALLELISM, getParallelism(iterationHead));
+				obj.put(CONTENTS,"Stream Iteration");
+				JSONArray iterationInputs = new JSONArray();
+				obj.put(PREDECESSORS, iterationInputs);
+				toVisit.remove(iterationHead);
+				visitIteration(iterationSteps, toVisit, iterationHead, edgeRemapings, iterationInputs);
+				jsonArray.put(obj);
+			}
 
-						JSONObject input = new JSONObject();
-						inputs.put(input);
+			if (!toVisit.isEmpty())
+			{
+				visit(jsonArray, toVisit, edgeRemapings);
+			}
+		}
 
-						input.put("id", inID);
-						input.put("ship_strategy", getOutPartitioner(inID, id).getStrategy());
-						if (i == 0) {
-							input.put("side", "first");
-						} else if (i == 1) {
-							input.put("side", "second");
-						}
+		private void visitIteration(JSONArray jsonArray, List<Integer> toVisit, int headId,
+			Map<Integer, Integer> edgeRemapings, JSONArray iterationInEdges) throws JSONException
{
+
+			Integer vertexID = toVisit.get(0);
+			toVisit.remove(vertexID);
+
+			//Ignoring head and tail to avoid redundancy
+			if (!iterationIds.containsKey(vertexID)) {
+				JSONObject obj = new JSONObject();
+				jsonArray.put(obj);
+				decorateNode(vertexID, obj);
+				JSONArray inEdges = new JSONArray();
+				obj.put(PREDECESSORS, inEdges);
+
+				for (int inputID : getInEdges(vertexID)) {
+					if (edgeRemapings.keySet().contains(inputID)) {
+						decorateEdge(inEdges, vertexID, inputID, inputID);
+					} else if (!iterationIds.containsKey(inputID)) {
+						decorateEdge(iterationInEdges, vertexID, inputID, inputID);
 					}
 				}
 
+				edgeRemapings.put(vertexID, headId);
+				visitIteration(jsonArray, toVisit, headId, edgeRemapings, iterationInEdges);
 			}
-			return json.toString();
-		} catch (JSONException e) {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("JSON plan creation failed: {}", e);
-			}
-			return "";
+
 		}
 
-	}
+		private void decorateEdge(JSONArray inputArray, int vertexID, int mappedInputID, int inputID)

+				throws JSONException {
+			JSONObject input = new JSONObject();
+			inputArray.put(input);
+			input.put(ID, mappedInputID);
+			input.put(SHIP_STRATEGY, getOutPartitioner(inputID, vertexID).getStrategy());
+			input.put(SIDE, (inputArray.length() == 0) ? "first" : "second");
+		}
 
-	@Override
-	public void dumpStreamingPlanAsJSON(File file) throws IOException {
-		PrintWriter pw = null;
-		try {
-			pw = new PrintWriter(new FileOutputStream(file), false);
-			pw.write(getStreamingPlanAsJSON());
-			pw.flush();
+		private void decorateNode(Integer vertexID, JSONObject node) throws JSONException {
+			node.put(ID, vertexID);
+			node.put(TYPE, getOperatorName(vertexID));
 
-		} finally {
-			if (pw != null) {
-				pw.close();
+			if (sources.contains(vertexID)) {
+				node.put(PACT, "Data Source");
+			} else {
+				node.put(PACT, "Data Stream");
+			}
+
+			if (getInvokable(vertexID) != null && getInvokable(vertexID).getUserFunction()
!= null) {
+				node.put(CONTENTS, getOperatorName(vertexID) + " at "
+						+ getInvokable(vertexID).getUserFunction().getClass().getSimpleName());
+			} else {
+				node.put(CONTENTS, getOperatorName(vertexID));
 			}
+
+			node.put(PARALLELISM, getParallelism(vertexID));
 		}
+
+
 	}
-}
+
+}
\ No newline at end of file


Mime
View raw message