flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/3] git commit: Make sure that operator names are properly escaped for display in the web client
Date Tue, 04 Nov 2014 11:33:41 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master e1fd6899e -> 9b2594234


Make sure that operator names are properly escaped for display in the web client


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ca9e6ab1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ca9e6ab1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ca9e6ab1

Branch: refs/heads/master
Commit: ca9e6ab1fad578a16ffe3426077a2f9df363fb25
Parents: 7d2db95
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Nov 4 11:10:47 2014 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Nov 4 11:31:16 2014 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/program/Client.java |  6 +-
 .../flink/client/program/PackagedProgram.java   | 45 +++++++++-
 .../flink/client/web/JobSubmissionServlet.java  |  5 +-
 .../program/ExecutionPlanCreationTest.java      | 95 ++++++++++++++++++++
 .../plandump/PlanJSONDumpGenerator.java         | 34 +++++--
 5 files changed, 174 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca9e6ab1/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index dcb54ac..70b5f9b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -177,9 +177,9 @@ public class Client {
 			}
 			
 			throw new ProgramInvocationException(
-					"The program plan could not be fetched. The program silently swallowed the control flow
exceptions.\n"
-					+ "System.err: "+StringEscapeUtils.escapeHtml4(baes.toString())+" \n"
-					+ "System.out: "+StringEscapeUtils.escapeHtml4(baos.toString())+" \n" );
+					"The program plan could not be fetched - the program aborted pre-maturely. <br/><br/>"
+					+ "System.err: "+StringEscapeUtils.escapeHtml4(baes.toString())+" <br/>"
+					+ "System.out: "+StringEscapeUtils.escapeHtml4(baos.toString())+" <br/>" );
 		}
 		else {
 			throw new RuntimeException();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca9e6ab1/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index f63da75..4d54f6b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -165,6 +165,43 @@ public class PackagedProgram {
 		}
 	}
 	
+	public PackagedProgram(Class<?> entryPointClass, String... args) throws ProgramInvocationException
{
+		this.jarFile = null;
+		this.args = args == null ? new String[0] : args;
+		
+		this.extractedTempLibraries = Collections.emptyList();
+		this.userCodeClassLoader = entryPointClass.getClassLoader();
+		
+		// load the entry point class
+		this.mainClass = entryPointClass;
+		
+		// if the entry point is a program, instantiate the class and get the plan
+		if (Program.class.isAssignableFrom(this.mainClass)) {
+			Program prg = null;
+			try {
+				prg = InstantiationUtil.instantiate(this.mainClass.asSubclass(Program.class), Program.class);
+			} catch (Exception e) {
+				// validate that the class has a main method at least.
+				// the main method possibly instantiates the program properly
+				if (!hasMainMethod(mainClass)) {
+					throw new ProgramInvocationException("The given program class implements the " + 
+							Program.class.getName() + " interface, but cannot be instantiated. " +
+							"It also declares no main(String[]) method as alternative entry point", e);
+				}
+			} catch (Throwable t) {
+				throw new ProgramInvocationException("Error while trying to instantiate program class.",
t);
+			}
+			this.program = prg;
+		} else if (hasMainMethod(mainClass)) {
+			this.program = null;
+		} else {
+			throw new ProgramInvocationException("The given program class neither has a main(String[])
method, nor does it implement the " + 
+					Program.class.getName() + " interface.");
+		}
+	}
+	
+	
+	
 	public String[] getArguments() {
 		return this.args;
 	}
@@ -191,7 +228,9 @@ public class PackagedProgram {
 		if (isUsingProgramEntryPoint()) {
 			List<File> allJars = new ArrayList<File>();
 			
-			allJars.add(jarFile);
+			if (this.jarFile != null) {
+				allJars.add(jarFile);
+			}
 			allJars.addAll(extractedTempLibraries);
 			
 			return new JobWithJars(getPlan(), allJars, userCodeClassLoader);
@@ -321,7 +360,9 @@ public class PackagedProgram {
 
 	public List<File> getAllLibraries() {
 		List<File> libs = new ArrayList<File>(this.extractedTempLibraries.size() +
1);
-		libs.add(jarFile);
+		if (jarFile != null) {
+			libs.add(jarFile);
+		}
 		libs.addAll(this.extractedTempLibraries);
 		return libs;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca9e6ab1/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
index 535f227..9cc6fad 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
@@ -224,7 +224,10 @@ public class JobSubmissionServlet extends HttpServlet {
 				// dump the job to a JSON file
 				String planName = uid + ".json";
 				File jsonFile = new File(this.planDumpDirectory, planName);
-				new PlanJSONDumpGenerator().dumpOptimizerPlanAsJSON(optPlan, jsonFile);
+				
+				PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
+				jsonGen.setEncodeForHTML(true);
+				jsonGen.dumpOptimizerPlanAsJSON(optPlan, jsonFile);
 
 				// submit the job only, if it should not be suspended
 				if (!suspend) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca9e6ab1/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
new file mode 100644
index 0000000..83bdc8d
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import static org.junit.Assert.*;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+public class ExecutionPlanCreationTest {
+
+	@Test
+	public void testGetExecutionPlan() {
+		try {
+			PackagedProgram prg = new PackagedProgram(TestOptimizerPlan.class, "/dev/random", "/tmp");
+			assertNotNull(prg.getPreviewPlan());
+			
+			InetAddress mockAddress = InetAddress.getLocalHost();
+			InetSocketAddress mockJmAddress = new InetSocketAddress(mockAddress, 12345);
+			
+			Client client = new Client(mockJmAddress, new Configuration(), getClass().getClassLoader());
+			OptimizedPlan op = client.getOptimizedPlan(prg, -1);
+			assertNotNull(op);
+			
+			PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
+			assertNotNull(dumper.getOptimizerPlanAsJSON(op));
+			
+			// test HTML escaping
+			PlanJSONDumpGenerator dumper2 = new PlanJSONDumpGenerator();
+			dumper2.setEncodeForHTML(true);
+			String htmlEscaped = dumper2.getOptimizerPlanAsJSON(op);
+			
+			assertEquals(-1, htmlEscaped.indexOf('\\'));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	public static class TestOptimizerPlan implements ProgramDescription {
+		
+		@SuppressWarnings("serial")
+		public static void main(String[] args) throws Exception {
+			if (args.length < 2) {
+				System.err.println("Usage: TestOptimizerPlan <input-file-path> <output-file-path>");
+				return;
+			}
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Long, Long>> input = env.readCsvFile(args[0])
+					.fieldDelimiter('\t').types(Long.class, Long.class);
+			
+			DataSet<Tuple2<Long, Long>> result = input.map(
+					new MapFunction<Tuple2<Long,Long>, Tuple2<Long,Long>>() {
+						public Tuple2<Long, Long> map(Tuple2<Long, Long> value){
+							return new Tuple2<Long, Long>(value.f0, value.f1+1);
+						}
+			});
+			result.writeAsCsv(args[1], "\n", "\t");
+			env.execute();
+		}
+		@Override
+		public String getDescription() {
+			return "TestOptimizerPlan <input-file-path> <output-file-path>";
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca9e6ab1/flink-compiler/src/main/java/org/apache/flink/compiler/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plandump/PlanJSONDumpGenerator.java
b/flink-compiler/src/main/java/org/apache/flink/compiler/plandump/PlanJSONDumpGenerator.java
index 60500b8..41dfd9b 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plandump/PlanJSONDumpGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plandump/PlanJSONDumpGenerator.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 
+import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.flink.api.common.operators.CompilerHints;
 import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.dag.BinaryUnionNode;
@@ -53,6 +54,7 @@ import org.apache.flink.compiler.plan.WorksetIterationPlanNode;
 import org.apache.flink.compiler.util.Utils;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.StringUtils;
 
 /**
  * 
@@ -62,9 +64,20 @@ public class PlanJSONDumpGenerator {
 	private Map<DumpableNode<?>, Integer> nodeIds; // resolves pact nodes to ids
 
 	private int nodeCnt;
+	
+	private boolean encodeForHTML;
 
 	// --------------------------------------------------------------------------------------------
 	
+	public void setEncodeForHTML(boolean encodeForHTML) {
+		this.encodeForHTML = encodeForHTML;
+	}
+	
+	public boolean isEncodeForHTML() {
+		return encodeForHTML;
+	}
+	
+	
 	public void dumpPactPlanAsJSON(List<DataSinkNode> nodes, PrintWriter writer) {
 		@SuppressWarnings("unchecked")
 		List<DumpableNode<?>> n = (List<DumpableNode<?>>) (List<?>)
nodes;
@@ -215,27 +228,38 @@ public class PlanJSONDumpGenerator {
 
 		
 		final String type;
-		final String contents;
+		String contents;
 		if (n instanceof DataSinkNode) {
 			type = "sink";
 			contents = n.getPactContract().toString();
 		} else if (n instanceof DataSourceNode) {
 			type = "source";
 			contents = n.getPactContract().toString();
-		} else if (n instanceof BulkIterationNode) {
+		}
+		else if (n instanceof BulkIterationNode) {
 			type = "bulk_iteration";
 			contents = n.getPactContract().getName();
-		} else if (n instanceof WorksetIterationNode) {
+		}
+		else if (n instanceof WorksetIterationNode) {
 			type = "workset_iteration";
 			contents = n.getPactContract().getName();
-		} else if (n instanceof BinaryUnionNode) {
+		}
+		else if (n instanceof BinaryUnionNode) {
 			type = "pact";
 			contents = "";
-		} else {
+		}
+		else {
 			type = "pact";
 			contents = n.getPactContract().getName();
 		}
 		
+		contents = StringUtils.showControlCharacters(contents);
+		if (encodeForHTML) {
+			contents = StringEscapeUtils.escapeHtml4(contents);
+			contents = contents.replace("\\", "&#92;");
+		}
+		
+		
 		String name = n.getName();
 		if (name.equals("Reduce") && (node instanceof SingleInputPlanNode) && 
 				((SingleInputPlanNode) node).getDriverStrategy() == DriverStrategy.SORTED_GROUP_COMBINE)
{


Mime
View raw message