flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [27/50] [abbrv] flink git commit: [FLINK-3486] [tableAPI] Fix broken renaming of all fields.
Date Fri, 18 Mar 2016 13:48:21 GMT
[FLINK-3486] [tableAPI] Fix broken renaming of all fields.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/805e425c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/805e425c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/805e425c

Branch: refs/heads/master
Commit: 805e425c941d43f8be8a63ff821f7982ee81bf67
Parents: 8fa4a99
Author: Fabian Hueske <fhueske@apache.org>
Authored: Mon Feb 29 16:34:24 2016 +0100
Committer: vasia <vasia@apache.org>
Committed: Fri Mar 18 14:44:50 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/api/table/table.scala      | 31 ++++++++++++++++++--
 .../flink/api/java/table/test/SelectITCase.java | 21 +++++++++++++
 .../api/scala/table/test/SelectITCase.scala     | 15 ++++++++++
 3 files changed, 64 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/805e425c/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index 138cd70..6a623b5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -20,10 +20,13 @@ package org.apache.flink.api.table
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataTypeField
 import org.apache.calcite.rel.core.JoinRelType
-import org.apache.calcite.rex.RexNode
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.rex.{RexInputRef, RexLiteral, RexCall, RexNode}
+import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.tools.RelBuilder
 import org.apache.calcite.tools.RelBuilder.{AggCall, GroupKey}
-import org.apache.flink.api.table.plan.RexNodeTranslator
+import org.apache.calcite.util.NlsString
+import org.apache.flink.api.table.plan.{PlanGenException, RexNodeTranslator}
 import RexNodeTranslator.{toRexNode, extractAggCalls}
 import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.api.table.parser.ExpressionParser
@@ -98,7 +101,29 @@ class Table(
       .map(toRexNode(_, relBuilder))
 
     relBuilder.project(exprs.toIterable.asJava)
-    new Table(relBuilder.build(), relBuilder)
+    var projected = relBuilder.build()
+
+    if(relNode == projected) {
+      // Calcite's RelBuilder does not translate identity projects even if they rename fields.
+      //   Add a projection ourselves (will be automatically removed by translation rules).
+      val names = exprs.map{ e =>
+        e.getKind match {
+          case SqlKind.AS =>
+            e.asInstanceOf[RexCall].getOperands.get(1)
+              .asInstanceOf[RexLiteral].getValue
+              .asInstanceOf[NlsString].getValue
+          case SqlKind.INPUT_REF =>
+            relNode.getRowType.getFieldNames.get(e.asInstanceOf[RexInputRef].getIndex)
+          case _ =>
+            throw new PlanGenException("Unexpected expression type encountered.")
+        }
+
+      }
+
+      projected = LogicalProject.create(relNode, exprs.toList.asJava, names.toList.asJava)
+    }
+
+    new Table(projected, relBuilder)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/805e425c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
index 3ce9891..82a2e4a 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
@@ -85,6 +85,27 @@ public class SelectITCase extends TableProgramsTestBase {
 		compareResultAsText(results, expected);
 	}
 
+	@Test
+	public void testSimpleSelectRenameAll() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = getJavaTableEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+		Table in = tableEnv.fromDataSet(ds);
+
+		Table result = in
+			.select("f0 as a, f1 as b, f2 as c")
+			.select("a, b");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+			"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+			"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+		compareResultAsText(results, expected);
+	}
+
 	@Test(expected = IllegalArgumentException.class)
 	public void testAsWithToFewFields() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

http://git-wip-us.apache.org/repos/asf/flink/blob/805e425c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
index 94438c6..111aaeb 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
@@ -87,6 +87,21 @@ class SelectITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Test
+  def testSimpleSelectRenameAll(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable
+      .select('_1 as 'a, '_2 as 'b, '_3 as 'c)
+      .select('a, 'b)
+
+    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+      "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n"
+
+      "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
+    val results = t.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
   @Test(expected = classOf[IllegalArgumentException])
   def testAsWithToFewFields(): Unit = {
 


Mime
View raw message