flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [2/2] flink git commit: [FLINK-3226] Improvements for expected types
Date Thu, 25 Feb 2016 19:04:47 GMT
[FLINK-3226] Improvements for expected types

This closes #1709


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b36401d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b36401d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b36401d

Branch: refs/heads/tableOnCalcite
Commit: 2b36401d4cca381188e5adb87206d0ba4a08ccd7
Parents: 61bc3a2
Author: twalthr <twalthr@apache.org>
Authored: Thu Feb 25 09:36:32 2016 +0100
Committer: vasia <vasia@apache.org>
Committed: Thu Feb 25 20:04:10 2016 +0100

----------------------------------------------------------------------
 .../flink/api/table/plan/TypeConverter.scala    | 20 ++++++++++++++++++++
 .../plan/nodes/dataset/DataSetFlatMap.scala     |  2 +-
 .../plan/nodes/dataset/DataSetGroupReduce.scala | 18 +++++++++++++-----
 .../table/plan/nodes/dataset/DataSetRel.scala   | 10 +++++++++-
 .../flink/api/scala/table/test/CalcITCase.scala |  7 +++----
 .../api/scala/table/test/FilterITCase.scala     |  1 -
 6 files changed, 46 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2b36401d/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
index 8a77b3b..c8e999d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
@@ -37,6 +37,8 @@ import scala.collection.JavaConversions._
 
 object TypeConverter {
 
+  val DEFAULT_ROW_TYPE = new RowTypeInfo(Seq()).asInstanceOf[TypeInformation[Any]]
+
   def typeInfoToSqlType(typeInfo: TypeInformation[_]): SqlTypeName = typeInfo match {
     case BOOLEAN_TYPE_INFO => BOOLEAN
     case BOOLEAN_VALUE_TYPE_INFO => BOOLEAN
@@ -83,6 +85,24 @@ object TypeConverter {
       ??? // TODO more types
   }
 
+  /**
+    * Determines the return type of Flink operators based on the logical fields, the expected
+    * physical type and configuration parameters.
+    *
+    * For example:
+    *   - No physical type expected, only 3 non-null fields and efficient type usage enabled
+    *       -> return Tuple3
+    *   - No physical type expected, efficient type usage enabled, but 3 nullable fields
+    *       -> return Row because Tuple does not support null values
+    *   - Physical type expected
+    *       -> check if physical type is compatible and return it
+    *
+    * @param logicalRowType logical row information
+    * @param expectedPhysicalType expected physical type
+    * @param nullable fields can be nullable
+    * @param useEfficientTypes use the most efficient types (e.g. Tuples and value types)
+    * @return suitable return type
+    */
   def determineReturnType(
       logicalRowType: RelDataType,
       expectedPhysicalType: Option[TypeInformation[Any]],

http://git-wip-us.apache.org/repos/asf/flink/blob/2b36401d/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala
index da49ec8..9744792 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala
@@ -62,7 +62,7 @@ class DataSetFlatMap(
 
   override def translateToPlan(config: TableConfig,
       expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-    val inputDataSet = input.asInstanceOf[DataSetRel].translateToPlan(config, expectedType)
+    val inputDataSet = input.asInstanceOf[DataSetRel].translateToPlan(config)
     val returnType = determineReturnType(
       getRowType,
       expectedType,

http://git-wip-us.apache.org/repos/asf/flink/blob/2b36401d/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
index afe09bb..af72522 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
@@ -24,12 +24,11 @@ import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.flink.api.common.functions.GroupReduceFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.{TableConfig, Row}
-import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.api.table.plan.{PlanGenException, TypeConverter}
 import org.apache.flink.api.table.typeinfo.RowTypeInfo
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.{Row, TableConfig}
+
 import scala.collection.JavaConverters._
-import org.apache.flink.api.table.plan.TypeConverter
 
 /**
   * Flink RelNode which matches along with ReduceGroupOperator.
@@ -67,7 +66,16 @@ class DataSetGroupReduce(
       config: TableConfig,
       expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
 
-    val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config, expectedType)
+    expectedType match {
+      case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
+        throw new PlanGenException("GroupReduce operations currently only support returning
Rows.")
+      case _ => // ok
+    }
+
+    val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(
+      config,
+      // tell the input operator that this operator currently only supports Rows as input
+      Some(TypeConverter.DEFAULT_ROW_TYPE))
 
     // get the output types
     val fieldsNames = rowType.getFieldNames

http://git-wip-us.apache.org/repos/asf/flink/blob/2b36401d/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
index 16a0ae3..35e23f7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
@@ -26,7 +26,15 @@ import org.apache.flink.api.table.TableConfig
 trait DataSetRel extends RelNode {
 
   /**
-    * Translate the FlinkRelNode into Flink operator.
+    * Translates the FlinkRelNode into a Flink operator.
+    *
+    * @param config runtime configuration
+    * @param expectedType specifies the type the Flink operator should return. The type must
+    *                     have the same arity as the result. For instance, if the
+    *                     expected type is a RowTypeInfo this method will return a DataSet
of
+    *                     type Row. If the expected type is Tuple2, the operator will return
+    *                     a Tuple2 if possible. Row otherwise.
+    * @return DataSet of type expectedType or RowTypeInfo
     */
   def translateToPlan(
       config: TableConfig,

http://git-wip-us.apache.org/repos/asf/flink/blob/2b36401d/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala
index ebbecb8..c5d31da 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala
@@ -22,15 +22,14 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.table.Row
-import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import TableProgramsTestBase.TableConfigMode
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import scala.collection.JavaConverters._
-import org.apache.flink.api.table.test.TableProgramsTestBase
-import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode
 
 @RunWith(classOf[Parameterized])
 class CalcITCase(
@@ -70,7 +69,7 @@ class CalcITCase(
   }
 
   @Test
-  def TestCalcWithAggregation(): Unit = {
+  def testCalcWithAggregation(): Unit = {
     
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = CollectionDataSets.get3TupleDataSet(env).toTable

http://git-wip-us.apache.org/repos/asf/flink/blob/2b36401d/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
index 624a168..2dfdb2c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
@@ -25,7 +25,6 @@ import org.apache.flink.api.table.Row
 import org.apache.flink.api.table.expressions.Literal
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase
 import TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._


Mime
View raw message