flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject flink git commit: [FLINK-4420] [table] Introduce star(*) to select all of the columns in the table
Date Mon, 29 Aug 2016 14:02:07 GMT
Repository: flink
Updated Branches:
  refs/heads/master c2585c6c8 -> 1f1788619


[FLINK-4420] [table] Introduce star(*) to select all of the columns in the table

This closes #2384.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1f178861
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1f178861
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1f178861

Branch: refs/heads/master
Commit: 1f17886198eb67c8fdb53b96098ebb812029b3ba
Parents: c2585c6
Author: Jark Wu <wuchong.wc@alibaba-inc.com>
Authored: Thu Aug 18 13:08:01 2016 +0800
Committer: twalthr <twalthr@apache.org>
Committed: Mon Aug 29 15:56:05 2016 +0200

----------------------------------------------------------------------
 docs/dev/table_api.md                           |  9 ++++
 .../flink/api/table/TableEnvironment.scala      | 10 ++++
 .../table/expressions/ExpressionParser.scala    |  3 +-
 .../api/table/expressions/fieldExpression.scala | 11 +++-
 .../api/table/plan/RexNodeTranslator.scala      | 17 +++++++
 .../api/table/plan/logical/operators.scala      |  6 ++-
 .../org/apache/flink/api/table/table.scala      |  8 +--
 .../api/java/batch/table/SelectITCase.java      | 23 +++++++++
 .../api/scala/batch/table/SelectITCase.scala    | 53 ++++++++++++++++++++
 9 files changed, 132 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1f178861/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 7a20e6a..8ca602d 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -501,6 +501,10 @@ This section gives a brief overview of the available operators. You can
find mor
 Table in = tableEnv.fromDataSet(ds, "a, b, c");
 Table result = in.select("a, c as d");
 {% endhighlight %}
+        <p>You can use star (<code>*</code>) to act as a wild card, selecting
all of the columns in the table.</p>
+{% highlight java %}
+Table result = in.select("*");
+{% endhighlight %}
       </td>
     </tr>
 
@@ -723,6 +727,11 @@ Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records
beginning w
 val in = ds.toTable(tableEnv, 'a, 'b, 'c);
 val result = in.select('a, 'c as 'd);
 {% endhighlight %}
+        <p>You can use star (<code>*</code>) to act as a wild card, selecting
all of the columns in the table.</p>
+{% highlight scala %}
+val in = ds.toTable(tableEnv, 'a, 'b, 'c);
+val result = in.select('*);
+{% endhighlight %}
       </td>
     </tr>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1f178861/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
index 8f61540..d7e650c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
@@ -265,6 +265,11 @@ abstract class TableEnvironment(val config: TableConfig) {
         throw new TableException(s"Type $tpe lacks explicit field naming")
     }
     val fieldIndexes = fieldNames.indices.toArray
+
+    if (fieldNames.contains("*")) {
+      throw new ValidationException("Field name can not be '*'.")
+    }
+
     (fieldNames, fieldIndexes)
   }
 
@@ -336,6 +341,11 @@ abstract class TableEnvironment(val config: TableConfig) {
     }
 
     val (fieldIndexes, fieldNames) = indexedNames.unzip
+
+    if (fieldNames.contains("*")) {
+      throw new ValidationException("Field name can not be '*'.")
+    }
+
     (fieldNames.toArray, fieldIndexes.toArray)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1f178861/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
index c57d43b..cb92573 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
@@ -75,6 +75,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val MINUTE: Keyword = Keyword("minute")
   lazy val SECOND: Keyword = Keyword("second")
   lazy val MILLI: Keyword = Keyword("milli")
+  lazy val STAR: Keyword = Keyword("*")
 
   def functionIdent: ExpressionParser.Parser[String] =
     not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~
@@ -159,7 +160,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
       stringLiteralFlink | singleQuoteStringLiteral |
       boolLiteral | nullLiteral
 
-  lazy val fieldReference: PackratParser[NamedExpression] = ident ^^ {
+  lazy val fieldReference: PackratParser[NamedExpression] = (STAR | ident) ^^ {
     sym => UnresolvedFieldReference(sym)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1f178861/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
index 0219d38..5f20751 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
@@ -22,7 +22,8 @@ import org.apache.calcite.tools.RelBuilder
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.table.UnresolvedException
-import org.apache.flink.api.table.validate.{ExprValidationResult, ValidationFailure}
+import org.apache.flink.api.table.validate.{ValidationSuccess, ExprValidationResult,
+ValidationFailure}
 
 trait NamedExpression extends Expression {
   private[flink] def name: String
@@ -91,6 +92,14 @@ case class Alias(child: Expression, name: String)
       UnresolvedFieldReference(name)
     }
   }
+
+  override private[flink] def validateInput(): ExprValidationResult = {
+    if (name == "*") {
+      ValidationFailure("Alias can not accept '*' as name.")
+    } else {
+      ValidationSuccess
+    }
+  }
 }
 
 case class UnresolvedAlias(child: Expression) extends UnaryExpression with NamedExpression
{

http://git-wip-us.apache.org/repos/asf/flink/blob/1f178861/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
index ae8c7c3..eb40bba 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
@@ -20,6 +20,9 @@ package org.apache.flink.api.table.plan
 
 import org.apache.flink.api.table.TableEnvironment
 import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.plan.logical.LogicalNode
+
+import scala.collection.mutable.ListBuffer
 
 object RexNodeTranslator {
 
@@ -68,4 +71,18 @@ object RexNodeTranslator {
         (e.makeCopy(newArgs.map(_._1).toArray), newArgs.flatMap(_._2).toList)
     }
   }
+
+  /**
+    * Parses all input expressions to [[UnresolvedAlias]].
+    * And expands star to parent's full project list.
+    */
+  def expandProjectList(exprs: Seq[Expression], parent: LogicalNode): Seq[NamedExpression]
= {
+    val projectList = new ListBuffer[NamedExpression]
+    exprs.foreach {
+      case n: UnresolvedFieldReference if n.name == "*" =>
+        projectList ++= parent.output.map(UnresolvedAlias(_))
+      case e: Expression => projectList += UnresolvedAlias(e)
+    }
+    projectList
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1f178861/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
index c33efd0..ccdab85 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
@@ -62,14 +62,14 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode)
extend
         case n: Alias =>
           // explicit name
           if (names.contains(n.name)) {
-            throw ValidationException(s"Duplicate field name $n.name.")
+            throw ValidationException(s"Duplicate field name ${n.name}.")
           } else {
             names.add(n.name)
           }
         case r: ResolvedFieldReference =>
           // simple field forwarding
           if (names.contains(r.name)) {
-            throw ValidationException(s"Duplicate field name $r.name.")
+            throw ValidationException(s"Duplicate field name ${r.name}.")
           } else {
             names.add(r.name)
           }
@@ -109,6 +109,8 @@ case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) extends
Una
       failValidation("Aliasing more fields than we actually have")
     } else if (!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
       failValidation("Alias only accept name expressions as arguments")
+    } else if (!aliasList.forall(_.asInstanceOf[UnresolvedFieldReference].name != "*")) {
+      failValidation("Alias can not accept '*' as name")
     } else {
       val names = aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name)
       val input = child.output

http://git-wip-us.apache.org/repos/asf/flink/blob/1f178861/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index bfabd32..9d96780 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -21,7 +21,7 @@ import org.apache.calcite.rel.RelNode
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.operators.join.JoinType
 import org.apache.flink.api.table.expressions.{Asc, ExpressionParser, UnresolvedAlias, Expression,
Ordering}
-import org.apache.flink.api.table.plan.RexNodeTranslator.extractAggregations
+import org.apache.flink.api.table.plan.RexNodeTranslator._
 import org.apache.flink.api.table.plan.logical._
 import org.apache.flink.api.table.sinks.TableSink
 
@@ -78,14 +78,14 @@ class Table(
   def select(fields: Expression*): Table = {
     val projectionOnAggregates = fields.map(extractAggregations(_, tableEnv))
     val aggregations = projectionOnAggregates.flatMap(_._2)
+    val projectList = expandProjectList(projectionOnAggregates.map(_._1), logicalPlan)
     if (aggregations.nonEmpty) {
       new Table(tableEnv,
-        Project(projectionOnAggregates.map(e => UnresolvedAlias(e._1)),
+        Project(projectList,
           Aggregate(Nil, aggregations, logicalPlan).validate(tableEnv)).validate(tableEnv))
     } else {
       new Table(tableEnv,
-        Project(
-          projectionOnAggregates.map(e => UnresolvedAlias(e._1)), logicalPlan).validate(tableEnv))
+        Project(projectList, logicalPlan).validate(tableEnv))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1f178861/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java
index e48914c..581c8ed 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java
@@ -127,4 +127,27 @@ public class SelectITCase extends TableProgramsTestBase {
 			// Must fail. Field foo does not exist
 			.select("a + 1 as foo, b + 2 as foo");
 	}
+
+	@Test
+	public void testSelectStar() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		Table in = tableEnv.fromDataSet(ds, "a,b,c");
+
+		Table result = in
+			.select("*");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+		                  "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n"
+
+		                  "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n"
+
+		                  "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
+		                  "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
+		                  "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
+		                  "20,6,Comment#14\n" + "21,6,Comment#15\n";
+		compareResultAsText(results, expected);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1f178861/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SelectITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SelectITCase.scala
index 9aed5a7..1143afd 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SelectITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SelectITCase.scala
@@ -26,6 +26,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
+import org.junit.Assert._
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -134,4 +135,56 @@ class SelectITCase(
       .select('a, 'b as 'a).toDataSet[Row].print()
   }
 
+  @Test
+  def testSelectStar(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*)
+
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n"
+
+      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testAliasStarException(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    try {
+      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, '*, 'b, 'c)
+      fail("ValidationException expected")
+    } catch {
+      case _: ValidationException => //ignore
+    }
+
+    try {
+      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+        .select('_1 as '*, '_2 as 'b, '_1 as 'c)
+      fail("ValidationException expected")
+    } catch {
+      case _: ValidationException => //ignore
+    }
+
+    try {
+      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('*, 'b, 'c)
+      fail("ValidationException expected")
+    } catch {
+      case _: ValidationException => //ignore
+    }
+
+    try {
+      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*, 'b)
+      fail("ValidationException expected")
+    } catch {
+      case _: ValidationException => //ignore
+    }
+  }
+
 }


Mime
View raw message