Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C1415183B4 for ; Fri, 18 Mar 2016 13:48:03 +0000 (UTC) Received: (qmail 44513 invoked by uid 500); 18 Mar 2016 13:48:03 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 44422 invoked by uid 500); 18 Mar 2016 13:48:03 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 43198 invoked by uid 99); 18 Mar 2016 13:47:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Mar 2016 13:47:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DD3DAE8E6C; Fri, 18 Mar 2016 13:47:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vasia@apache.org To: commits@flink.apache.org Date: Fri, 18 Mar 2016 13:48:19 -0000 Message-Id: <9573acc84b574d768e271287ca981f6b@git.apache.org> In-Reply-To: <23694d950bbb4ace842cfa98be8f6e58@git.apache.org> References: <23694d950bbb4ace842cfa98be8f6e58@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [25/50] [abbrv] flink git commit: [FLINK-3226] Improvements for expected types [FLINK-3226] Improvements for expected types This closes #1709 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5d6a564 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5d6a564 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5d6a564 Branch: refs/heads/master Commit: b5d6a564895095459c368cad6069df07b7c0382b Parents: 3e3f076 Author: twalthr Authored: Thu Feb 25 09:36:32 2016 +0100 Committer: vasia Committed: Fri Mar 18 14:44:50 2016 +0100 ---------------------------------------------------------------------- .../flink/api/table/plan/TypeConverter.scala | 20 ++++++++++++++++++++ .../plan/nodes/dataset/DataSetFlatMap.scala | 2 +- .../plan/nodes/dataset/DataSetGroupReduce.scala | 18 +++++++++++++----- .../table/plan/nodes/dataset/DataSetRel.scala | 10 +++++++++- .../flink/api/scala/table/test/CalcITCase.scala | 7 +++---- .../api/scala/table/test/FilterITCase.scala | 1 - 6 files changed, 46 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b5d6a564/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala index 8a77b3b..c8e999d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala @@ -37,6 +37,8 @@ import scala.collection.JavaConversions._ object TypeConverter { + val DEFAULT_ROW_TYPE = new RowTypeInfo(Seq()).asInstanceOf[TypeInformation[Any]] + def typeInfoToSqlType(typeInfo: TypeInformation[_]): SqlTypeName = typeInfo match { case BOOLEAN_TYPE_INFO => BOOLEAN case BOOLEAN_VALUE_TYPE_INFO => BOOLEAN @@ -83,6 +85,24 @@ object TypeConverter { ??? // TODO more types } + /** + * Determines the return type of Flink operators based on the logical fields, the expected + * physical type and configuration parameters. + * + * For example: + * - No physical type expected, only 3 non-null fields and efficient type usage enabled + * -> return Tuple3 + * - No physical type expected, efficient type usage enabled, but 3 nullable fields + * -> return Row because Tuple does not support null values + * - Physical type expected + * -> check if physical type is compatible and return it + * + * @param logicalRowType logical row information + * @param expectedPhysicalType expected physical type + * @param nullable fields can be nullable + * @param useEfficientTypes use the most efficient types (e.g. Tuples and value types) + * @return suitable return type + */ def determineReturnType( logicalRowType: RelDataType, expectedPhysicalType: Option[TypeInformation[Any]], http://git-wip-us.apache.org/repos/asf/flink/blob/b5d6a564/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala index da49ec8..9744792 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala @@ -62,7 +62,7 @@ class DataSetFlatMap( override def translateToPlan(config: TableConfig, expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { - val inputDataSet = input.asInstanceOf[DataSetRel].translateToPlan(config, expectedType) + val inputDataSet = input.asInstanceOf[DataSetRel].translateToPlan(config) val returnType = determineReturnType( getRowType, expectedType, http://git-wip-us.apache.org/repos/asf/flink/blob/b5d6a564/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala index afe09bb..af72522 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala @@ -24,12 +24,11 @@ import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} import org.apache.flink.api.common.functions.GroupReduceFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.{TableConfig, Row} -import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.api.table.plan.{PlanGenException, TypeConverter} import org.apache.flink.api.table.typeinfo.RowTypeInfo -import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.{Row, TableConfig} + import scala.collection.JavaConverters._ -import org.apache.flink.api.table.plan.TypeConverter /** * Flink RelNode which matches along with ReduceGroupOperator. @@ -67,7 +66,16 @@ class DataSetGroupReduce( config: TableConfig, expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { - val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config, expectedType) + expectedType match { + case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] => + throw new PlanGenException("GroupReduce operations currently only support returning Rows.") + case _ => // ok + } + + val inputDS = input.asInstanceOf[DataSetRel].translateToPlan( + config, + // tell the input operator that this operator currently only supports Rows as input + Some(TypeConverter.DEFAULT_ROW_TYPE)) // get the output types val fieldsNames = rowType.getFieldNames http://git-wip-us.apache.org/repos/asf/flink/blob/b5d6a564/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala index 16a0ae3..35e23f7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala @@ -26,7 +26,15 @@ import org.apache.flink.api.table.TableConfig trait DataSetRel extends RelNode { /** - * Translate the FlinkRelNode into Flink operator. + * Translates the FlinkRelNode into a Flink operator. + * + * @param config runtime configuration + * @param expectedType specifies the type the Flink operator should return. The type must + * have the same arity as the result. For instance, if the + * expected type is a RowTypeInfo this method will return a DataSet of + * type Row. If the expected type is Tuple2, the operator will return + * a Tuple2 if possible. Row otherwise. + * @return DataSet of type expectedType or RowTypeInfo */ def translateToPlan( config: TableConfig, http://git-wip-us.apache.org/repos/asf/flink/blob/b5d6a564/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala index ebbecb8..c5d31da 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala @@ -22,15 +22,14 @@ import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.api.table.Row -import org.apache.flink.test.util.MultipleProgramsTestBase +import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import TableProgramsTestBase.TableConfigMode import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized import scala.collection.JavaConverters._ -import org.apache.flink.api.table.test.TableProgramsTestBase -import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode @RunWith(classOf[Parameterized]) class CalcITCase( @@ -70,7 +69,7 @@ class CalcITCase( } @Test - def TestCalcWithAggregation(): Unit = { + def testCalcWithAggregation(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val t = CollectionDataSets.get3TupleDataSet(env).toTable http://git-wip-us.apache.org/repos/asf/flink/blob/b5d6a564/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala index 624a168..2dfdb2c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala @@ -25,7 +25,6 @@ import org.apache.flink.api.table.Row import org.apache.flink.api.table.expressions.Literal import org.apache.flink.api.table.test.utils.TableProgramsTestBase import TableProgramsTestBase.TableConfigMode -import org.apache.flink.api.table.test.utils.TableProgramsTestBase import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit._