flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [4/4] flink git commit: [FLINK-3192] [TableAPI] Add explain support to print the sql-execution plan.
Date Tue, 12 Jan 2016 23:23:49 GMT
[FLINK-3192] [TableAPI] Add explain support to print the sql-execution plan.

This closes #1477


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dbbab0a9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dbbab0a9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dbbab0a9

Branch: refs/heads/master
Commit: dbbab0a90b82d18ac14e4791d8f155c2e039b3ee
Parents: b474da6
Author: gallenvara <gallenvara@126.com>
Authored: Fri Dec 11 13:57:51 2015 +0800
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Tue Jan 12 22:27:07 2016 +0100

----------------------------------------------------------------------
 flink-staging/flink-table/pom.xml               |   6 +
 .../org/apache/flink/api/table/Table.scala      |  23 +++
 .../apache/flink/api/table/explain/Node.java    | 145 +++++++++++++
 .../flink/api/table/explain/PlanJsonParser.java | 144 +++++++++++++
 .../flink/api/table/plan/operations.scala       |   1 +
 .../api/java/table/test/SqlExplainITCase.java   | 206 +++++++++++++++++++
 .../api/scala/table/test/SqlExplainITCase.scala |  96 +++++++++
 .../src/test/scala/resources/testFilter0.out    |  28 +++
 .../src/test/scala/resources/testFilter1.out    |  96 +++++++++
 .../src/test/scala/resources/testJoin0.out      |  39 ++++
 .../src/test/scala/resources/testJoin1.out      | 141 +++++++++++++
 .../src/test/scala/resources/testUnion0.out     |  38 ++++
 .../src/test/scala/resources/testUnion1.out     | 140 +++++++++++++
 pom.xml                                         |   1 +
 14 files changed, 1104 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/pom.xml b/flink-staging/flink-table/pom.xml
index 45ea785..bdd1b58 100644
--- a/flink-staging/flink-table/pom.xml
+++ b/flink-staging/flink-table/pom.xml
@@ -94,6 +94,12 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>com.fasterxml.jackson.core</groupId>
+			<artifactId>jackson-databind</artifactId>
+			<version>${jackson.version}</version>
+		</dependency>
+
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
index 6ece212..641f2fa 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
@@ -17,10 +17,14 @@
  */
 package org.apache.flink.api.table
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat
+import org.apache.flink.api.table.explain.PlanJsonParser
 import org.apache.flink.api.table.expressions.analysis.{GroupByAnalyzer, PredicateAnalyzer,
SelectionAnalyzer}
 import org.apache.flink.api.table.expressions.{Expression, ResolvedFieldReference, UnresolvedFieldReference}
 import org.apache.flink.api.table.parser.ExpressionParser
 import org.apache.flink.api.table.plan._
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
 
 /**
  * The abstraction for writing Table API programs. Similar to how the batch and streaming
APIs
@@ -267,5 +271,24 @@ case class Table(private[flink] val operation: PlanNode) {
     this.copy(operation = UnionAll(operation, right.operation))
   }
 
+  /**
+   * Get the process of the sql parsing, print AST and physical execution plan.The AST
+   * show the structure of the supplied statement. The execution plan shows how the table

+   * referenced by the statement will be scanned.
+   */
+  def explain(extended: Boolean): String = {
+    val ast = operation
+    val dataSet = this.toDataSet[Row]
+    val env = dataSet.getExecutionEnvironment
+    dataSet.output(new DiscardingOutputFormat[Row])
+    val jasonSqlPlan = env.getExecutionPlan()
+    val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended)
+    val result = "== Abstract Syntax Tree ==\n" + ast + "\n\n" + "== Physical Execution Plan
==" +
+      "\n" + sqlPlan
+    return result
+  }
+  
+  def explain(): String = explain(false)
+  
   override def toString: String = s"Expression($operation)"
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/Node.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/Node.java
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/Node.java
new file mode 100644
index 0000000..9152260
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/Node.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.explain;
+
+import java.util.List;
+
+public class Node {
+	private int id;
+	private String type;
+	private String pact;
+	private String contents;
+	private int parallelism;
+	private String driver_strategy;
+	private List<Predecessors> predecessors;
+	private List<Global_properties> global_properties;
+	private List<LocalProperty> local_properties;
+	private List<Estimates> estimates;
+	private List<Costs> costs;
+	private List<Compiler_hints> compiler_hints;
+
+	public int getId() {
+		return id;
+	}
+	public String getType() {
+		return type;
+	}
+	public String getPact() {
+		return pact;
+	}
+	public String getContents() {
+		return contents;
+	}
+	public int getParallelism() {
+		return parallelism;
+	}
+	public String getDriver_strategy() {
+		return driver_strategy;
+	}
+	public List<Predecessors> getPredecessors() {
+		return predecessors;
+	}
+	public List<Global_properties> getGlobal_properties() {
+		return global_properties;
+	}
+	public List<LocalProperty> getLocal_properties() {
+		return local_properties;
+	}
+	public List<Estimates> getEstimates() {
+		return estimates;
+	}
+	public List<Costs> getCosts() {
+		return costs;
+	}
+	public List<Compiler_hints> getCompiler_hints() {
+		return compiler_hints;
+	}
+}
+
+class Predecessors {
+	private String ship_strategy;
+	private String exchange_mode;
+
+	public String getShip_strategy() {
+		return ship_strategy;
+	}
+	public String getExchange_mode() {
+		return exchange_mode;
+	}
+}
+
+class Global_properties {
+	private String name;
+	private String value;
+
+	public String getValue() {
+		return value;
+	}
+	public String getName() {
+		return name;
+	}
+}
+
+class LocalProperty {
+	private String name;
+	private String value;
+
+	public String getValue() {
+		return value;
+	}
+	public String getName() {
+		return name;
+	}
+}
+
+class Estimates {
+	private String name;
+	private String value;
+
+	public String getValue() {
+		return value;
+	}
+	public String getName() {
+		return name;
+	}
+}
+
+class Costs {
+	private String name;
+	private String value;
+
+	public String getValue() {
+		return value;
+	}
+	public String getName() {
+		return name;
+	}
+}
+
+class Compiler_hints {
+	private String name;
+	private String value;
+
+	public String getValue() {
+		return value;
+	}
+	public String getName() {
+		return name;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/PlanJsonParser.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/PlanJsonParser.java
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/PlanJsonParser.java
new file mode 100644
index 0000000..31a7cd68
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/PlanJsonParser.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.explain;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+public class PlanJsonParser {
+
+	public static String getSqlExecutionPlan(String t, Boolean extended) throws Exception {
+		ObjectMapper objectMapper = new ObjectMapper();
+
+		//not every node is same, ignore the unknown field
+		objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+		PlanTree tree = objectMapper.readValue(t, PlanTree.class);
+		LinkedHashMap<String, Integer> map = new LinkedHashMap<>();
+		StringWriter sw = new StringWriter();
+		PrintWriter pw = new PrintWriter(sw);
+		int tabCount = 0;
+
+		for (int index = 0; index < tree.getNodes().size(); index++) {
+			Node tempNode = tree.getNodes().get(index);
+
+			//input with operation such as join or union is coordinate, keep the same indent 
+			if ((tempNode.getPact().equals("Data Source")) && (map.containsKey(tempNode.getPact())))
{
+				tabCount = map.get(tempNode.getPact());
+			}
+			else {
+				map.put(tempNode.getPact(), tabCount);
+			}
+
+			printTab(tabCount, pw);
+			pw.print("Stage " + tempNode.getId() + " : " + tempNode.getPact() + "\n");
+
+			printTab(tabCount + 1, pw);
+			String content = tempNode.getContents();
+
+			//drop the hashcode of object instance
+			int dele = tempNode.getContents().indexOf("@");
+			if (dele > -1) content = tempNode.getContents().substring(0, dele);
+			
+			//replace with certain content if node is dataSource to pass
+			//unit tests, because java and scala use different api to
+			//get input element
+			if (tempNode.getPact().equals("Data Source"))
+				content = "collect elements with CollectionInputFormat";
+			pw.print("content : " + content + "\n");
+
+			List<Predecessors> predecessors = tempNode.getPredecessors();
+			if (predecessors != null) {
+				printTab(tabCount + 1, pw);
+				pw.print("ship_strategy : " + predecessors.get(0).getShip_strategy() + "\n");
+
+				printTab(tabCount + 1, pw);
+				pw.print("exchange_mode : " + predecessors.get(0).getExchange_mode() + "\n");
+			}
+
+			if (tempNode.getDriver_strategy() != null) {
+				printTab(tabCount + 1, pw);
+				pw.print("driver_strategy : " + tempNode.getDriver_strategy() + "\n");
+			}
+
+			printTab(tabCount + 1, pw);
+			pw.print(tempNode.getGlobal_properties().get(0).getName() + " : "
+					+ tempNode.getGlobal_properties().get(0).getValue() + "\n");
+
+			if (extended) {
+				List<Global_properties> globalProperties = tempNode.getGlobal_properties();
+				for (int i = 1; i < globalProperties.size(); i++) {
+					printTab(tabCount + 1, pw);
+					pw.print(globalProperties.get(i).getName() + " : "
+					+ globalProperties.get(i).getValue() + "\n");
+				}
+
+				List<LocalProperty> localProperties = tempNode.getLocal_properties();
+				for (int i = 0; i < localProperties.size(); i++) {
+					printTab(tabCount + 1, pw);
+					pw.print(localProperties.get(i).getName() + " : "
+					+ localProperties.get(i).getValue() + "\n");
+				}
+
+				List<Estimates> estimates = tempNode.getEstimates();
+				for (int i = 0; i < estimates.size(); i++) {
+					printTab(tabCount + 1, pw);
+					pw.print(estimates.get(i).getName() + " : "
+					+ estimates.get(i).getValue() + "\n");
+				}
+
+				List<Costs> costs = tempNode.getCosts();
+				for (int i = 0; i < costs.size(); i++) {
+					printTab(tabCount + 1, pw);
+					pw.print(costs.get(i).getName() + " : "
+					+ costs.get(i).getValue() + "\n");
+				}
+
+				List<Compiler_hints> compilerHintses = tempNode.getCompiler_hints();
+				for (int i = 0; i < compilerHintses.size(); i++) {
+					printTab(tabCount + 1, pw);
+					pw.print(compilerHintses.get(i).getName() + " : "
+					+ compilerHintses.get(i).getValue() + "\n");
+				}
+			}
+			tabCount++;
+			pw.print("\n");
+		}
+		pw.close();
+		return sw.toString();
+	}
+
+	private static void printTab(int tabCount, PrintWriter pw) {
+		for (int i = 0; i < tabCount; i++)
+			pw.print("\t");
+	}
+}
+
+class PlanTree {
+	private List<Node> nodes;
+
+	public List<Node> getNodes() {
+		return nodes;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala
index ca7874b..7ec34d7 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala
@@ -35,6 +35,7 @@ sealed abstract class PlanNode extends TreeNode[PlanNode] { self: Product
=>
  */
 case class Root[T](input: T, outputFields: Seq[(String, TypeInformation[_])]) extends PlanNode
{
   val children = Nil
+  override def toString = s"Root($outputFields)"
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java
b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java
new file mode 100644
index 0000000..e73b5a2
--- /dev/null
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.table.Table;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Scanner;
+
+import static org.junit.Assert.assertEquals;
+
+public class SqlExplainITCase {
+
+	private static String testFilePath = SqlExplainITCase.class.getResource("/").getFile();
+
+	public static class WC {
+		public String word;
+		public int count;
+
+		// Public constructor to make it a Flink POJO
+		public WC() {}
+
+		public WC(int count, String word) {
+			this.word = word;
+			this.count = count;
+		}
+	}
+
+	@Test
+	public void testGroupByWithoutExtended() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSet<WC> input = env.fromElements(
+				new WC(1,"d"),
+				new WC(2,"d"),
+				new WC(3,"d"));
+
+		Table table = tableEnv.fromDataSet(input).as("a, b");
+
+		String result = table
+				.filter("a % 2 = 0")
+				.explain();
+		String source = new Scanner(new File(testFilePath +
+				"../../src/test/scala/resources/testFilter0.out"))
+				.useDelimiter("\\A").next();
+		assertEquals(result, source);
+	}
+
+	@Test
+	public void testGroupByWithExtended() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSet<WC> input = env.fromElements(
+				new WC(1, "d"),
+				new WC(2, "d"),
+				new WC(3, "d"));
+
+		Table table = tableEnv.fromDataSet(input).as("a, b");
+
+		String result = table
+				.filter("a % 2 = 0")
+				.explain(true);
+		String source = new Scanner(new File(testFilePath +
+				"../../src/test/scala/resources/testFilter1.out"))
+				.useDelimiter("\\A").next();
+		assertEquals(result, source);
+	}
+
+	@Test
+	public void testJoinWithoutExtended() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSet<WC> input1 = env.fromElements(
+				new WC(1, "d"),
+				new WC(1, "d"),
+				new WC(1, "d"));
+
+		Table table1 = tableEnv.fromDataSet(input1).as("a, b");
+
+		DataSet<WC> input2 = env.fromElements(
+				new WC(1,"d"),
+				new WC(1,"d"),
+				new WC(1,"d"));
+
+		Table table2 = tableEnv.fromDataSet(input2).as("c, d");
+
+		String result = table1
+				.join(table2)
+				.where("b = d")
+				.select("a, c")
+				.explain();
+		String source = new Scanner(new File(testFilePath +
+				"../../src/test/scala/resources/testJoin0.out"))
+				.useDelimiter("\\A").next();
+		assertEquals(result, source);
+	}
+
+	@Test
+	public void testJoinWithExtended() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSet<WC> input1 = env.fromElements(
+				new WC(1, "d"),
+				new WC(1, "d"),
+				new WC(1, "d"));
+
+		Table table1 = tableEnv.fromDataSet(input1).as("a, b");
+
+		DataSet<WC> input2 = env.fromElements(
+				new WC(1, "d"),
+				new WC(1, "d"),
+				new WC(1, "d"));
+
+		Table table2 = tableEnv.fromDataSet(input2).as("c, d");
+
+		String result = table1
+				.join(table2)
+				.where("b = d")
+				.select("a, c")
+				.explain(true);
+		String source = new Scanner(new File(testFilePath +
+				"../../src/test/scala/resources/testJoin1.out"))
+				.useDelimiter("\\A").next();
+		assertEquals(result, source);
+	}
+
+	@Test
+	public void testUnionWithoutExtended() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSet<WC> input1 = env.fromElements(
+				new WC(1, "d"),
+				new WC(1, "d"),
+				new WC(1, "d"));
+
+		Table table1 = tableEnv.fromDataSet(input1);
+
+		DataSet<WC> input2 = env.fromElements(
+				new WC(1, "d"),
+				new WC(1, "d"),
+				new WC(1, "d"));
+
+		Table table2 = tableEnv.fromDataSet(input2);
+
+		String result = table1
+				.unionAll(table2)
+				.explain();
+		String source = new Scanner(new File(testFilePath +
+				"../../src/test/scala/resources/testUnion0.out"))
+				.useDelimiter("\\A").next();
+		assertEquals(result, source);
+	}
+
+	@Test
+	public void testUnionWithExtended() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSet<WC> input1 = env.fromElements(
+				new WC(1, "d"),
+				new WC(1, "d"),
+				new WC(1, "d"));
+
+		Table table1 = tableEnv.fromDataSet(input1);
+
+		DataSet<WC> input2 = env.fromElements(
+				new WC(1, "d"),
+				new WC(1, "d"),
+				new WC(1, "d"));
+
+		Table table2 = tableEnv.fromDataSet(input2);
+
+		String result = table1
+				.unionAll(table2)
+				.explain(true);
+		String source = new Scanner(new File(testFilePath +
+				"../../src/test/scala/resources/testUnion1.out"))
+				.useDelimiter("\\A").next();
+		assertEquals(result, source);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala
b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala
new file mode 100644
index 0000000..bead02f
--- /dev/null
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainITCase.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+
+import org.junit._
+import org.junit.Assert.assertEquals
+
+case class WC(count: Int, word: String)
+
+class SqlExplainITCase {
+
+  val testFilePath = SqlExplainITCase.this.getClass.getResource("/").getFile
+
+  @Test
+  def testGroupByWithoutExtended() : Unit = {
+    val env = ExecutionEnvironment.createLocalEnvironment()
+    val expr = env.fromElements(WC(1, "hello"), WC(2, "hello"), WC(3, "ciao")).toTable.as('a,
'b)
+    val result = expr.filter("a % 2 = 0").explain()
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testFilter0.out").mkString
+    assertEquals(result, source)
+  }
+
+  @Test
+  def testGroupByWithExtended() : Unit = {
+    val env = ExecutionEnvironment.createLocalEnvironment()
+    val expr = env.fromElements(WC(1, "hello"), WC(2, "hello"), WC(3, "ciao")).toTable.as('a,
'b)
+    val result = expr.filter("a % 2 = 0").explain(true)
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testFilter1.out").mkString
+    assertEquals(result, source)
+  }
+
+  @Test
+  def testJoinWithoutExtended() : Unit = {
+    val env = ExecutionEnvironment.createLocalEnvironment()
+    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao")).toTable.as('a,
'b)
+    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java")).toTable.as('c,
'd)
+    val result = expr1.join(expr2).where("b = d").select("a, c").explain()
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testJoin0.out").mkString
+    assertEquals(result, source)
+  }
+
+  @Test
+  def testJoinWithExtended() : Unit = {
+    val env = ExecutionEnvironment.createLocalEnvironment()
+    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao")).toTable.as('a,
'b)
+    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java")).toTable.as('c,
'd)
+    val result = expr1.join(expr2).where("b = d").select("a, c").explain(true)
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testJoin1.out").mkString
+    assertEquals(result, source)
+  }
+
+  @Test
+  def testUnionWithoutExtended() : Unit = {
+    val env = ExecutionEnvironment.createLocalEnvironment()
+    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao")).toTable
+    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java")).toTable
+    val result = expr1.unionAll(expr2).explain()
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testUnion0.out").mkString
+    assertEquals(result, source)
+  }
+
+  @Test
+  def testUnionWithExtended() : Unit = {
+    val env = ExecutionEnvironment.createLocalEnvironment()
+    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao")).toTable
+    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java")).toTable
+    val result = expr1.unionAll(expr2).explain(true)
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testUnion1.out").mkString
+    assertEquals(result, source)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/test/scala/resources/testFilter0.out
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/resources/testFilter0.out b/flink-staging/flink-table/src/test/scala/resources/testFilter0.out
new file mode 100644
index 0000000..062fc90
--- /dev/null
+++ b/flink-staging/flink-table/src/test/scala/resources/testFilter0.out
@@ -0,0 +1,28 @@
+== Abstract Syntax Tree ==
+Filter(As(Root(ArraySeq((count,Integer), (word,String))), a,b), ('a * 2) === 0)
+
+== Physical Execution Plan ==
+Stage 3 : Data Source
+	content : collect elements with CollectionInputFormat
+	Partitioning : RANDOM_PARTITIONED
+
+	Stage 2 : Map
+		content : Map at select('count as 'count,'word as 'word)
+		ship_strategy : Forward
+		exchange_mode : PIPELINED
+		driver_strategy : Map
+		Partitioning : RANDOM_PARTITIONED
+
+		Stage 1 : Filter
+			content : ('a * 2) === 0
+			ship_strategy : Forward
+			exchange_mode : PIPELINED
+			driver_strategy : FlatMap
+			Partitioning : RANDOM_PARTITIONED
+
+			Stage 0 : Data Sink
+				content : org.apache.flink.api.java.io.DiscardingOutputFormat
+				ship_strategy : Forward
+				exchange_mode : PIPELINED
+				Partitioning : RANDOM_PARTITIONED
+

http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/test/scala/resources/testFilter1.out
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/resources/testFilter1.out b/flink-staging/flink-table/src/test/scala/resources/testFilter1.out
new file mode 100644
index 0000000..83378e6
--- /dev/null
+++ b/flink-staging/flink-table/src/test/scala/resources/testFilter1.out
@@ -0,0 +1,96 @@
+== Abstract Syntax Tree ==
+Filter(As(Root(ArraySeq((count,Integer), (word,String))), a,b), ('a * 2) === 0)
+
+== Physical Execution Plan ==
+Stage 3 : Data Source
+	content : collect elements with CollectionInputFormat
+	Partitioning : RANDOM_PARTITIONED
+	Partitioning Order : (none)
+	Uniqueness : not unique
+	Order : (none)
+	Grouping : not grouped
+	Uniqueness : not unique
+	Est. Output Size : (unknown)
+	Est. Cardinality : (unknown)
+	Network : 0.0
+	Disk I/O : 0.0
+	CPU : 0.0
+	Cumulative Network : 0.0
+	Cumulative Disk I/O : 0.0
+	Cumulative CPU : 0.0
+	Output Size (bytes) : (none)
+	Output Cardinality : (none)
+	Avg. Output Record Size (bytes) : (none)
+	Filter Factor : (none)
+
+	Stage 2 : Map
+		content : Map at select('count as 'count,'word as 'word)
+		ship_strategy : Forward
+		exchange_mode : PIPELINED
+		driver_strategy : Map
+		Partitioning : RANDOM_PARTITIONED
+		Partitioning Order : (none)
+		Uniqueness : not unique
+		Order : (none)
+		Grouping : not grouped
+		Uniqueness : not unique
+		Est. Output Size : (unknown)
+		Est. Cardinality : (unknown)
+		Network : 0.0
+		Disk I/O : 0.0
+		CPU : 0.0
+		Cumulative Network : 0.0
+		Cumulative Disk I/O : 0.0
+		Cumulative CPU : 0.0
+		Output Size (bytes) : (none)
+		Output Cardinality : (none)
+		Avg. Output Record Size (bytes) : (none)
+		Filter Factor : (none)
+
+		Stage 1 : Filter
+			content : ('a * 2) === 0
+			ship_strategy : Forward
+			exchange_mode : PIPELINED
+			driver_strategy : FlatMap
+			Partitioning : RANDOM_PARTITIONED
+			Partitioning Order : (none)
+			Uniqueness : not unique
+			Order : (none)
+			Grouping : not grouped
+			Uniqueness : not unique
+			Est. Output Size : 0.0
+			Est. Cardinality : 0.0
+			Network : 0.0
+			Disk I/O : 0.0
+			CPU : 0.0
+			Cumulative Network : 0.0
+			Cumulative Disk I/O : 0.0
+			Cumulative CPU : 0.0
+			Output Size (bytes) : (none)
+			Output Cardinality : (none)
+			Avg. Output Record Size (bytes) : (none)
+			Filter Factor : (none)
+
+			Stage 0 : Data Sink
+				content : org.apache.flink.api.java.io.DiscardingOutputFormat
+				ship_strategy : Forward
+				exchange_mode : PIPELINED
+				Partitioning : RANDOM_PARTITIONED
+				Partitioning Order : (none)
+				Uniqueness : not unique
+				Order : (none)
+				Grouping : not grouped
+				Uniqueness : not unique
+				Est. Output Size : 0.0
+				Est. Cardinality : 0.0
+				Network : 0.0
+				Disk I/O : 0.0
+				CPU : 0.0
+				Cumulative Network : 0.0
+				Cumulative Disk I/O : 0.0
+				Cumulative CPU : 0.0
+				Output Size (bytes) : (none)
+				Output Cardinality : (none)
+				Avg. Output Record Size (bytes) : (none)
+				Filter Factor : (none)
+

http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/test/scala/resources/testJoin0.out
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/resources/testJoin0.out b/flink-staging/flink-table/src/test/scala/resources/testJoin0.out
new file mode 100644
index 0000000..e6e30be
--- /dev/null
+++ b/flink-staging/flink-table/src/test/scala/resources/testJoin0.out
@@ -0,0 +1,39 @@
+== Abstract Syntax Tree ==
+Select(Filter(Join(As(Root(ArraySeq((count,Integer), (word,String))), a,b), As(Root(ArraySeq((count,Integer),
(word,String))), c,d)), 'b === 'd), 'a,'c)
+
+== Physical Execution Plan ==
+Stage 3 : Data Source
+	content : collect elements with CollectionInputFormat
+	Partitioning : RANDOM_PARTITIONED
+
+	Stage 2 : Map
+		content : Map at select('count as 'count,'word as 'word)
+		ship_strategy : Forward
+		exchange_mode : PIPELINED
+		driver_strategy : Map
+		Partitioning : RANDOM_PARTITIONED
+
+Stage 5 : Data Source
+	content : collect elements with CollectionInputFormat
+	Partitioning : RANDOM_PARTITIONED
+
+	Stage 4 : Map
+		content : Map at select('count as 'count,'word as 'word)
+		ship_strategy : Forward
+		exchange_mode : PIPELINED
+		driver_strategy : Map
+		Partitioning : RANDOM_PARTITIONED
+
+		Stage 1 : Join
+			content : Join at 'b === 'd
+			ship_strategy : Hash Partition on [1]
+			exchange_mode : PIPELINED
+			driver_strategy : Hybrid Hash (build: Map at select('count as 'count,'word as 'word))
+			Partitioning : RANDOM_PARTITIONED
+
+			Stage 0 : Data Sink
+				content : org.apache.flink.api.java.io.DiscardingOutputFormat
+				ship_strategy : Forward
+				exchange_mode : PIPELINED
+				Partitioning : RANDOM_PARTITIONED
+

http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/test/scala/resources/testJoin1.out
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/resources/testJoin1.out b/flink-staging/flink-table/src/test/scala/resources/testJoin1.out
new file mode 100644
index 0000000..a8f05dd
--- /dev/null
+++ b/flink-staging/flink-table/src/test/scala/resources/testJoin1.out
@@ -0,0 +1,141 @@
+== Abstract Syntax Tree ==
+Select(Filter(Join(As(Root(ArraySeq((count,Integer), (word,String))), a,b), As(Root(ArraySeq((count,Integer),
(word,String))), c,d)), 'b === 'd), 'a,'c)
+
+== Physical Execution Plan ==
+Stage 3 : Data Source
+	content : collect elements with CollectionInputFormat
+	Partitioning : RANDOM_PARTITIONED
+	Partitioning Order : (none)
+	Uniqueness : not unique
+	Order : (none)
+	Grouping : not grouped
+	Uniqueness : not unique
+	Est. Output Size : (unknown)
+	Est. Cardinality : (unknown)
+	Network : 0.0
+	Disk I/O : 0.0
+	CPU : 0.0
+	Cumulative Network : 0.0
+	Cumulative Disk I/O : 0.0
+	Cumulative CPU : 0.0
+	Output Size (bytes) : (none)
+	Output Cardinality : (none)
+	Avg. Output Record Size (bytes) : (none)
+	Filter Factor : (none)
+
+	Stage 2 : Map
+		content : Map at select('count as 'count,'word as 'word)
+		ship_strategy : Forward
+		exchange_mode : PIPELINED
+		driver_strategy : Map
+		Partitioning : RANDOM_PARTITIONED
+		Partitioning Order : (none)
+		Uniqueness : not unique
+		Order : (none)
+		Grouping : not grouped
+		Uniqueness : not unique
+		Est. Output Size : (unknown)
+		Est. Cardinality : (unknown)
+		Network : 0.0
+		Disk I/O : 0.0
+		CPU : 0.0
+		Cumulative Network : 0.0
+		Cumulative Disk I/O : 0.0
+		Cumulative CPU : 0.0
+		Output Size (bytes) : (none)
+		Output Cardinality : (none)
+		Avg. Output Record Size (bytes) : (none)
+		Filter Factor : (none)
+
+Stage 5 : Data Source
+	content : collect elements with CollectionInputFormat
+	Partitioning : RANDOM_PARTITIONED
+	Partitioning Order : (none)
+	Uniqueness : not unique
+	Order : (none)
+	Grouping : not grouped
+	Uniqueness : not unique
+	Est. Output Size : (unknown)
+	Est. Cardinality : (unknown)
+	Network : 0.0
+	Disk I/O : 0.0
+	CPU : 0.0
+	Cumulative Network : 0.0
+	Cumulative Disk I/O : 0.0
+	Cumulative CPU : 0.0
+	Output Size (bytes) : (none)
+	Output Cardinality : (none)
+	Avg. Output Record Size (bytes) : (none)
+	Filter Factor : (none)
+
+	Stage 4 : Map
+		content : Map at select('count as 'count,'word as 'word)
+		ship_strategy : Forward
+		exchange_mode : PIPELINED
+		driver_strategy : Map
+		Partitioning : RANDOM_PARTITIONED
+		Partitioning Order : (none)
+		Uniqueness : not unique
+		Order : (none)
+		Grouping : not grouped
+		Uniqueness : not unique
+		Est. Output Size : (unknown)
+		Est. Cardinality : (unknown)
+		Network : 0.0
+		Disk I/O : 0.0
+		CPU : 0.0
+		Cumulative Network : 0.0
+		Cumulative Disk I/O : 0.0
+		Cumulative CPU : 0.0
+		Output Size (bytes) : (none)
+		Output Cardinality : (none)
+		Avg. Output Record Size (bytes) : (none)
+		Filter Factor : (none)
+
+		Stage 1 : Join
+			content : Join at 'b === 'd
+			ship_strategy : Hash Partition on [1]
+			exchange_mode : PIPELINED
+			driver_strategy : Hybrid Hash (build: Map at select('count as 'count,'word as 'word))
+			Partitioning : RANDOM_PARTITIONED
+			Partitioning Order : (none)
+			Uniqueness : not unique
+			Order : (none)
+			Grouping : not grouped
+			Uniqueness : not unique
+			Est. Output Size : (unknown)
+			Est. Cardinality : (unknown)
+			Network : (unknown)
+			Disk I/O : (unknown)
+			CPU : (unknown)
+			Cumulative Network : (unknown)
+			Cumulative Disk I/O : (unknown)
+			Cumulative CPU : (unknown)
+			Output Size (bytes) : (none)
+			Output Cardinality : (none)
+			Avg. Output Record Size (bytes) : (none)
+			Filter Factor : (none)
+
+			Stage 0 : Data Sink
+				content : org.apache.flink.api.java.io.DiscardingOutputFormat
+				ship_strategy : Forward
+				exchange_mode : PIPELINED
+				Partitioning : RANDOM_PARTITIONED
+				Partitioning Order : (none)
+				Uniqueness : not unique
+				Order : (none)
+				Grouping : not grouped
+				Uniqueness : not unique
+				Est. Output Size : (unknown)
+				Est. Cardinality : (unknown)
+				Network : 0.0
+				Disk I/O : 0.0
+				CPU : 0.0
+				Cumulative Network : (unknown)
+				Cumulative Disk I/O : (unknown)
+				Cumulative CPU : (unknown)
+				Output Size (bytes) : (none)
+				Output Cardinality : (none)
+				Avg. Output Record Size (bytes) : (none)
+				Filter Factor : (none)
+

http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/test/scala/resources/testUnion0.out
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/resources/testUnion0.out b/flink-staging/flink-table/src/test/scala/resources/testUnion0.out
new file mode 100644
index 0000000..db9d2f9
--- /dev/null
+++ b/flink-staging/flink-table/src/test/scala/resources/testUnion0.out
@@ -0,0 +1,38 @@
+== Abstract Syntax Tree ==
+Union(Root(ArraySeq((count,Integer), (word,String))), Root(ArraySeq((count,Integer), (word,String))))
+
+== Physical Execution Plan ==
+Stage 3 : Data Source
+	content : collect elements with CollectionInputFormat
+	Partitioning : RANDOM_PARTITIONED
+
+	Stage 2 : Map
+		content : Map at select('count as 'count,'word as 'word)
+		ship_strategy : Forward
+		exchange_mode : PIPELINED
+		driver_strategy : Map
+		Partitioning : RANDOM_PARTITIONED
+
+Stage 5 : Data Source
+	content : collect elements with CollectionInputFormat
+	Partitioning : RANDOM_PARTITIONED
+
+	Stage 4 : Map
+		content : Map at select('count as 'count,'word as 'word)
+		ship_strategy : Forward
+		exchange_mode : PIPELINED
+		driver_strategy : Map
+		Partitioning : RANDOM_PARTITIONED
+
+		Stage 1 : Union
+			content : 
+			ship_strategy : Redistribute
+			exchange_mode : PIPELINED
+			Partitioning : RANDOM_PARTITIONED
+
+			Stage 0 : Data Sink
+				content : org.apache.flink.api.java.io.DiscardingOutputFormat
+				ship_strategy : Forward
+				exchange_mode : PIPELINED
+				Partitioning : RANDOM_PARTITIONED
+

http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/flink-staging/flink-table/src/test/scala/resources/testUnion1.out
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/resources/testUnion1.out b/flink-staging/flink-table/src/test/scala/resources/testUnion1.out
new file mode 100644
index 0000000..8dc1e53
--- /dev/null
+++ b/flink-staging/flink-table/src/test/scala/resources/testUnion1.out
@@ -0,0 +1,140 @@
+== Abstract Syntax Tree ==
+Union(Root(ArraySeq((count,Integer), (word,String))), Root(ArraySeq((count,Integer), (word,String))))
+
+== Physical Execution Plan ==
+Stage 3 : Data Source
+	content : collect elements with CollectionInputFormat
+	Partitioning : RANDOM_PARTITIONED
+	Partitioning Order : (none)
+	Uniqueness : not unique
+	Order : (none)
+	Grouping : not grouped
+	Uniqueness : not unique
+	Est. Output Size : (unknown)
+	Est. Cardinality : (unknown)
+	Network : 0.0
+	Disk I/O : 0.0
+	CPU : 0.0
+	Cumulative Network : 0.0
+	Cumulative Disk I/O : 0.0
+	Cumulative CPU : 0.0
+	Output Size (bytes) : (none)
+	Output Cardinality : (none)
+	Avg. Output Record Size (bytes) : (none)
+	Filter Factor : (none)
+
+	Stage 2 : Map
+		content : Map at select('count as 'count,'word as 'word)
+		ship_strategy : Forward
+		exchange_mode : PIPELINED
+		driver_strategy : Map
+		Partitioning : RANDOM_PARTITIONED
+		Partitioning Order : (none)
+		Uniqueness : not unique
+		Order : (none)
+		Grouping : not grouped
+		Uniqueness : not unique
+		Est. Output Size : (unknown)
+		Est. Cardinality : (unknown)
+		Network : 0.0
+		Disk I/O : 0.0
+		CPU : 0.0
+		Cumulative Network : 0.0
+		Cumulative Disk I/O : 0.0
+		Cumulative CPU : 0.0
+		Output Size (bytes) : (none)
+		Output Cardinality : (none)
+		Avg. Output Record Size (bytes) : (none)
+		Filter Factor : (none)
+
+Stage 5 : Data Source
+	content : collect elements with CollectionInputFormat
+	Partitioning : RANDOM_PARTITIONED
+	Partitioning Order : (none)
+	Uniqueness : not unique
+	Order : (none)
+	Grouping : not grouped
+	Uniqueness : not unique
+	Est. Output Size : (unknown)
+	Est. Cardinality : (unknown)
+	Network : 0.0
+	Disk I/O : 0.0
+	CPU : 0.0
+	Cumulative Network : 0.0
+	Cumulative Disk I/O : 0.0
+	Cumulative CPU : 0.0
+	Output Size (bytes) : (none)
+	Output Cardinality : (none)
+	Avg. Output Record Size (bytes) : (none)
+	Filter Factor : (none)
+
+	Stage 4 : Map
+		content : Map at select('count as 'count,'word as 'word)
+		ship_strategy : Forward
+		exchange_mode : PIPELINED
+		driver_strategy : Map
+		Partitioning : RANDOM_PARTITIONED
+		Partitioning Order : (none)
+		Uniqueness : not unique
+		Order : (none)
+		Grouping : not grouped
+		Uniqueness : not unique
+		Est. Output Size : (unknown)
+		Est. Cardinality : (unknown)
+		Network : 0.0
+		Disk I/O : 0.0
+		CPU : 0.0
+		Cumulative Network : 0.0
+		Cumulative Disk I/O : 0.0
+		Cumulative CPU : 0.0
+		Output Size (bytes) : (none)
+		Output Cardinality : (none)
+		Avg. Output Record Size (bytes) : (none)
+		Filter Factor : (none)
+
+		Stage 1 : Union
+			content : 
+			ship_strategy : Redistribute
+			exchange_mode : PIPELINED
+			Partitioning : RANDOM_PARTITIONED
+			Partitioning Order : (none)
+			Uniqueness : not unique
+			Order : (none)
+			Grouping : not grouped
+			Uniqueness : not unique
+			Est. Output Size : (unknown)
+			Est. Cardinality : (unknown)
+			Network : 0.0
+			Disk I/O : 0.0
+			CPU : 0.0
+			Cumulative Network : (unknown)
+			Cumulative Disk I/O : 0.0
+			Cumulative CPU : 0.0
+			Output Size (bytes) : (none)
+			Output Cardinality : (none)
+			Avg. Output Record Size (bytes) : (none)
+			Filter Factor : (none)
+
+			Stage 0 : Data Sink
+				content : org.apache.flink.api.java.io.DiscardingOutputFormat
+				ship_strategy : Forward
+				exchange_mode : PIPELINED
+				Partitioning : RANDOM_PARTITIONED
+				Partitioning Order : (none)
+				Uniqueness : not unique
+				Order : (none)
+				Grouping : not grouped
+				Uniqueness : not unique
+				Est. Output Size : (unknown)
+				Est. Cardinality : (unknown)
+				Network : 0.0
+				Disk I/O : 0.0
+				CPU : 0.0
+				Cumulative Network : (unknown)
+				Cumulative Disk I/O : 0.0
+				Cumulative CPU : 0.0
+				Output Size (bytes) : (none)
+				Output Cardinality : (none)
+				Avg. Output Record Size (bytes) : (none)
+				Filter Factor : (none)
+

http://git-wip-us.apache.org/repos/asf/flink/blob/dbbab0a9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 91e4ca1..091552d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -797,6 +797,7 @@ under the License.
 						<exclude>flink-tests/src/test/resources/testdata/terainput.txt</exclude>
 						<exclude>flink-staging/flink-avro/src/test/resources/avro/*.avsc</exclude>
 						<exclude>out/test/flink-avro/avro/user.avsc</exclude>
+						<exclude>flink-staging/flink-table/src/test/scala/resources/*.out</exclude>
 						<!-- TweetInputFormat Test Data-->
 						<exclude>flink-contrib/flink-tweet-inputformat/src/main/resources/HashTagTweetSample.json</exclude>
 						<exclude>flink-staging/flink-avro/src/test/resources/testdata.avro</exclude>


Mime
View raw message