flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [3/5] flink git commit: [FLINK-4623] [table] Add physical execution plan to StreamTableEnvironment explain().
Date Wed, 02 Nov 2016 20:46:41 GMT
[FLINK-4623] [table] Add physical execution plan to StreamTableEnvironment explain().

This closes #2720.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d60fe723
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d60fe723
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d60fe723

Branch: refs/heads/master
Commit: d60fe723aa357733c6ad8715b0e8c4e55ab7f52d
Parents: ed6a602
Author: anton solovev <anton_solovev@epam.com>
Authored: Tue Oct 25 15:55:42 2016 +0400
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Wed Nov 2 18:30:18 2016 +0100

----------------------------------------------------------------------
 .../flink/api/table/explain/PlanJsonParser.java | 15 ++++++++-----
 .../api/table/StreamTableEnvironment.scala      | 18 +++++++++++++---
 .../api/scala/stream/ExplainStreamTest.scala    | 22 ++++++++++++++------
 .../test/scala/resources/testFilterStream0.out  | 13 ++++++++++++
 .../test/scala/resources/testUnionStream0.out   | 16 ++++++++++++++
 5 files changed, 70 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d60fe723/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java
b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java
index 3c4d3d9..bd14cd2 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java
@@ -62,7 +62,7 @@ public class PlanJsonParser {
 			if (dele > -1) {
 				content = tempNode.getContents().substring(0, dele);
 			}
-			
+
 			//replace with certain content if node is dataSource to pass
 			//unit tests, because java and scala use different api to
 			//get input element
@@ -76,8 +76,11 @@ public class PlanJsonParser {
 				printTab(tabCount + 1, pw);
 				pw.print("ship_strategy : " + predecessors.get(0).getShip_strategy() + "\n");
 
-				printTab(tabCount + 1, pw);
-				pw.print("exchange_mode : " + predecessors.get(0).getExchange_mode() + "\n");
+				String mode = predecessors.get(0).getExchange_mode();
+				if (mode != null) {
+					printTab(tabCount + 1, pw);
+					pw.print("exchange_mode : " + mode + "\n");
+				}
 			}
 
 			if (tempNode.getDriver_strategy() != null) {
@@ -85,9 +88,11 @@ public class PlanJsonParser {
 				pw.print("driver_strategy : " + tempNode.getDriver_strategy() + "\n");
 			}
 
-			printTab(tabCount + 1, pw);
-			pw.print(tempNode.getGlobal_properties().get(0).getName() + " : "
+			if (tempNode.getGlobal_properties() != null) {
+				printTab(tabCount + 1, pw);
+				pw.print(tempNode.getGlobal_properties().get(0).getName() + " : "
 					+ tempNode.getGlobal_properties().get(0).getValue() + "\n");
+			}
 
 			if (extended) {
 				List<Global_properties> globalProperties = tempNode.getGlobal_properties();

http://git-wip-us.apache.org/repos/asf/flink/blob/d60fe723/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
index b9e889d..bca8d79 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
@@ -26,6 +26,8 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.{Programs, RuleSet}
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.table.explain.PlanJsonParser
 import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
 import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel}
@@ -311,14 +313,24 @@ abstract class StreamTableEnvironment(
     *
     * @param table The table for which the AST and execution plan will be returned.
     */
-   def explain(table: Table): String = {
+  def explain(table: Table): String = {
 
     val ast = RelOptUtil.toString(table.getRelNode)
 
+    val dataStream = translate[Row](table)(TypeExtractor.createTypeInfo(classOf[Row]))
+
+    val env = dataStream.getExecutionEnvironment
+    val jsonSqlPlan = env.getExecutionPlan
+
+    val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jsonSqlPlan, false)
+
     s"== Abstract Syntax Tree ==" +
       System.lineSeparator +
-      ast
-
+      s"$ast" +
+      System.lineSeparator +
+      s"== Physical Execution Plan ==" +
+      System.lineSeparator +
+      s"$sqlPlan"
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d60fe723/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala
index 71500f1..5eebb34 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala
@@ -40,10 +40,12 @@ class ExplainStreamTest
       .toTable(tEnv, 'a, 'b)
       .filter("a % 2 = 0")
 
-    val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
+    val result = replaceString(tEnv.explain(table))
+
     val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testFilterStream0.out").mkString.replaceAll("\\r\\n",
"\n")
-    assertEquals(result, source)
+      "../../src/test/scala/resources/testFilterStream0.out").mkString
+    val expect = replaceString(source)
+    assertEquals(result, expect)
   }
 
   @Test
@@ -55,10 +57,18 @@ class ExplainStreamTest
     val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
     val table = table1.unionAll(table2)
 
-    val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
+    val result = replaceString(tEnv.explain(table))
+
     val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testUnionStream0.out").mkString.replaceAll("\\r\\n",
"\n")
-    assertEquals(result, source)
+      "../../src/test/scala/resources/testUnionStream0.out").mkString
+    val expect = replaceString(source)
+    assertEquals(result, expect)
   }
 
+  def replaceString(s: String): String = {
+    /* Stage {id} is ignored, because id keeps incrementing in test class
+     * while StreamExecutionEnvironment is up
+     */
+    s.replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "")
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d60fe723/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
index 3fda6de..20ae2b1 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out
@@ -1,3 +1,16 @@
 == Abstract Syntax Tree ==
 LogicalFilter(condition=[=(MOD($0, 2), 0)])
   LogicalTableScan(table=[[_DataStreamTable_0]])
+
+== Physical Execution Plan ==
+Stage 1 : Data Source
+	content : collect elements with CollectionInputFormat
+
+	Stage 2 : Operator
+		content : from: (a, b)
+		ship_strategy : REBALANCE
+
+		Stage 3 : Operator
+			content : where: (=(MOD(a, 2), 0)), select: (a, b)
+			ship_strategy : FORWARD
+

http://git-wip-us.apache.org/repos/asf/flink/blob/d60fe723/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
index b2e3000..ac3635d 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
@@ -2,3 +2,19 @@
 LogicalUnion(all=[true])
   LogicalTableScan(table=[[_DataStreamTable_0]])
   LogicalTableScan(table=[[_DataStreamTable_1]])
+
+== Physical Execution Plan ==
+Stage 1 : Data Source
+	content : collect elements with CollectionInputFormat
+
+Stage 2 : Data Source
+	content : collect elements with CollectionInputFormat
+
+	Stage 3 : Operator
+		content : from: (count, word)
+		ship_strategy : REBALANCE
+
+		Stage 4 : Operator
+			content : from: (count, word)
+			ship_strategy : REBALANCE
+


Mime
View raw message